0%

Netty源码阅读笔记(一)服务端启动流程

服务端启动样例

为了方便理解,先来看以下服务端的启动流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class NettyServer{
// ...
public void start() throws Exception {
// 创建NIO的EventLoop实例
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
// 配置
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(...);
}
});
// 启动
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}
}
}

概述

Netty服务端在启动时,主要有以下几个流程:

  1. 创建NioSocketChannel
  2. 初始化NioSocketChannel
  3. 注册NioSocketChannel到Selector中
  4. 做Bind操作
  5. 添加Read事件

下面一一来看

服务端启动流程

从bind方法进入,最终到doBind方法中,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建并初始化Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//不能肯定register完成,因为register是丢到nio event loop里面执行去了。
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// register完成之后,真正开始做bind操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
....
}
}

initAndRegister()方法中,做了3个事情,Channel的创建,初始化,注册到Selector中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 创建Channel
channel = channelFactory.newChannel();
// 2. 初始化Channel
init(channel);
} catch (Throwable t) {
/* other code... */
}
/* other code... */

// 3. 开始register
ChannelFuture regFuture = config().group().register(channel);

/* other code... */
}

1. 创建NioSocketChannel

从上面可以看出,创建Channel是使用一个工厂方法来创造Channel实例,而工厂方法中其实是从类对象中以反射的形式获取其无参构造函数,最终进行实例化

1
2
3
4
5
6
7
8
9
10
@Override
//泛型T代表不同的Channel
public T newChannel() {
try {
//反射创建channel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}

这时可能会有疑问,这个类对象是哪来的呢? 类对象是在配置ServerBootStrap的时候穿进去的,

1
2
3
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class) // 就是这个类对象
....

而在这里,只是实例化了Netty中的Channel,真正在底层干活的Channel实际上是NIO的Channel,Netty的Channel只不过是在其基础上进行的封装,以ServerSocketChannel为例子,来看一下Netty中Channel的创建流程。

从无参构造方法进入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32


public NioSocketChannel() {
// DEFAULT_SELECTOR_PROVIDER 是 SelectorProvider 的provider,因为在不同的系统中,selector的实现也不同,比如linux上是epoll,mac上是kqueue, 具体应该使用哪一种selector,由SelectorProvider.provider()来得到.
this(DEFAULT_SELECTOR_PROVIDER);
}

/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioSocketChannel(SelectorProvider provider) {
// newSocket使用selector来open一个Channel,这个Channel就是NIO的channel了
this(newSocket(provider));
}

/**
* Create a new instance using the given {@link SocketChannel}.
*/
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}

/**
* Create a new instance
*
* @param parent the {@link Channel} which created this instance or {@code null} if it was created by the user
* @param socket the {@link SocketChannel} which will be used
*/
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

到了super(parent, socket);,实际上已经创建好NIO的负责连接的Channel了,但Netty的Channel除此以外又加了许多新的东西,比如:每一个Channel都有一个ChannelID,都有一个Unsafe对象,都有一个Pipline等,其中的一个构造方法:

1
2
3
4
5
6
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

以上是创建NioSocketChannel的主要流程

2. 初始化NioSocketChannel

先说说这部分做了什么事情吧, 主要做的事情就是,根据option传入的参数来对ServerSocketChannel进行socket相关的配置, 并保存childOption配置,之后ServerSocketChannel在创建SocketChannel的时候,就根据childOption中的内容来对新建的SocketChannel进行配置。

1
2
3
4
b.group(bossGroup,workerGroup)
.option(..., ...)
.childOption(..., ...)
....

同时,对于NioServerSocketChannel来说,在这一步还会在pipline中加入一个ServerBootstrapAcceptor, 这是一个handler,它负责接收客户端连接创建连接后,对连接的初始化工作。

3. 将Channel注册到Selector

1
2
// 注册
ChannelFuture regFuture = config().group().register(channel);

层层进入,最终

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {

....
// 设置该Channel的eventLoop (事件到了之后,业务处理都由它来做)
AbstractChannel.this.eventLoop = eventLoop;


if (eventLoop.inEventLoop()) {
// 注册
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}

private void register0(ChannelPromise promise) {
try {
// ...
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 回调HandlerAdded方法
pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);

// 回调ChannelRegistered方法
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
//server socket的注册不会走进下面if,server socket接受连接创建的socket可以走进去。因为accept后就active了。
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
...
}
}
} catch (Throwable t) {
...
}
}
}

除了回调一些方法之外,最重要的就是上面的doRegister方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
logger.info("initial register: " + 0);
// 就是在这里进行注册!
// eventLoop().unwrappedSelector()返回的实际上就是NIO中的selector
// 0表示什么事件都不关心
// attachment就是当前的NioSocketChannel,也就是Netty中自己实现的Channel的实例
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
。。。
}
}
}

以上就是注册的主干,可以看出,Netty底层真正干活的,其实还是Java的NIO

4. 做Bind操作

看完了initAndRegister,再回到bind()的顶层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建并初始化Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//不能肯定register完成,因为register是丢到nio event loop里面执行去了。
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// register完成之后,真正开始做bind操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
....
}
}

从doBind0进入,进入了很多层之后(这里不一一写下来了),能够到达下面这个地方

1
2
3
4
5
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}

也就是使用unsafe来进行的bind。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
....
....

boolean wasActive = isActive();
try {
doBind(localAddress); // 在这里做真正的绑定
} catch (Throwable t) {
...
}
//绑定后,才开始激活
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 回调ChannelActive方法
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}

在Channel注册完成之后,使用unsafe对象对其进行bind操作,bind成功后回调Channel的ChannelActive()方法。

这里多说两句,在上面的pipeline.fireChannelActive();的真面貌实际是:

1
2
3
4
5
6
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 注册读事件:读包括:创建连接/读数据)
readIfIsAutoRead();
}

上面的readIfIsAutoRead实际上做的事情就是给SeverSocketChannel注册一个读事件,剥到最终是下面这个代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
//假设之前没有监听readInterestOp,则监听readInterestOp
if ((interestOps & readInterestOp) == 0) {
//NioServerSocketChannel: readInterestOp = OP_ACCEPT = 1 << 4 = 16
logger.info("interest ops: " + readInterestOp);
selectionKey.interestOps(interestOps | readInterestOp);
}
}

最终,启动流程的几个步骤就是 创建NioSocketChannel -> 初始化 -> 注册 -> bind -> 注册read事件