服务端启动样例 为了方便理解,先来看以下服务端的启动流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class NettyServer { public void start () throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class ) .localAddress (new InetSocketAddress (port )) .childHandler (new ChannelInitializer <SocketChannel >() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(...); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); } } }
概述 Netty服务端在启动时,主要有以下几个流程:
创建NioSocketChannel
初始化NioSocketChannel
注册NioSocketChannel到Selector中
做Bind操作
添加Read事件
下面一一来看
服务端启动流程 从bind方法进入,最终到doBind方法中,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { .... } }
在initAndRegister()
方法中,做了3个事情,Channel的创建,初始化,注册到Selector中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { } ChannelFuture regFuture = config().group().register(channel); }
1. 创建NioSocketChannel 从上面可以看出,创建Channel是使用一个工厂方法来创造Channel实例,而工厂方法中其实是从类对象 中以反射的形式获取其无参构造函数,最终进行实例化
1 2 3 4 5 6 7 8 9 10 @Override public T newChannel () { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }
这时可能会有疑问,这个类对象是哪来的呢? 类对象是在配置ServerBootStrap的时候穿进去的,
1 2 3 b.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class ) // 就是这个类对象 ....
而在这里,只是实例化了Netty中的Channel,真正在底层干活的Channel实际上是NIO的Channel,Netty的Channel只不过是在其基础上进行的封装,以ServerSocketChannel为例子,来看一下Netty中Channel的创建流程。
从无参构造方法进入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public NioSocketChannel () { this (DEFAULT_SELECTOR_PROVIDER); } public NioSocketChannel (SelectorProvider provider) { this (newSocket(provider)); } public NioSocketChannel (SocketChannel socket) { this (null , socket); } public NioSocketChannel (Channel parent, SocketChannel socket) { super (parent, socket); config = new NioSocketChannelConfig(this , socket.socket()); }
到了super(parent, socket);
,实际上已经创建好NIO的负责连接的Channel了,但Netty的Channel除此以外又加了许多新的东西,比如:每一个Channel都有一个ChannelID,都有一个Unsafe对象,都有一个Pipline等,其中的一个构造方法:
1 2 3 4 5 6 protected AbstractChannel (Channel parent) { this .parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
以上是创建NioSocketChannel的主要流程
2. 初始化NioSocketChannel 先说说这部分做了什么事情吧, 主要做的事情就是,根据option传入的参数来对ServerSocketChannel进行socket相关的配置, 并保存childOption配置,之后ServerSocketChannel在创建SocketChannel的时候,就根据childOption中的内容来对新建的SocketChannel进行配置。
1 2 3 4 b.group(bossGroup,workerGroup) .option(..., ...) .childOption(..., ...) ....
同时,对于NioServerSocketChannel来说,在这一步还会在pipline中加入一个ServerBootstrapAcceptor, 这是一个handler,它负责接收客户端连接创建连接后,对连接的初始化工作。
3. 将Channel注册到Selector 1 2 ChannelFuture regFuture = config().group().register(channel);
层层进入,最终
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { .... AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run () { register0(promise); } }); } catch (Throwable t) { ... } } } private void register0 (ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { ... } } } catch (Throwable t) { ... } } }
除了回调一些方法之外,最重要的就是上面的doRegister
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { logger.info("initial register: " + 0 ); selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException e) { 。。。 } } }
以上就是注册的主干,可以看出,Netty底层真正干活的,其实还是Java的NIO
4. 做Bind操作 看完了initAndRegister,再回到bind()的顶层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { .... } }
从doBind0进入,进入了很多层之后(这里不一一写下来了),能够到达下面这个地方
1 2 3 4 5 @Override public void bind ( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); }
也就是使用unsafe来进行的bind。。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { .... .... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { ... } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
在Channel注册完成之后,使用unsafe对象对其进行bind操作,bind成功后回调Channel的ChannelActive()
方法。
这里多说两句,在上面的pipeline.fireChannelActive();
的真面貌实际是:
1 2 3 4 5 6 @Override public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
上面的readIfIsAutoRead
实际上做的事情就是给SeverSocketChannel注册一个读事件,剥到最终是下面这个代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { logger.info("interest ops: " + readInterestOp); selectionKey.interestOps(interestOps | readInterestOp); } }
最终,启动流程的几个步骤就是 创建NioSocketChannel -> 初始化 -> 注册 -> bind -> 注册read事件