概述
Netty中Channel接口的描述:
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.
即是一个对socket连接的抽象,在都满足这一抽象的前提下,Netty对Channel的实现也有很多种类,本篇将主要记录对NioServerSocketChannel和NioSocketChannel这两种Channel实现的学习。
Channel的继承关系
基于上面的这个这个继承关系图,来一层一层分析每一层的抽象主要都是做了什么事情.
1. Channel
提供了接口规范,是对连接的抽象。 Channel接口中还定义了Unsafe接口,简单来说的话,是要使用Unsafe对象来去实现Channel的一些操作(例如read, write等)
2. AbstractChannel
作用:
A skeletal {@link Channel} implementation.
主要做了什么:
- 定义了一系列的变量(如id, unsafe, pipline等等)
- 对id, pipline和unsafe对象进行了初始化, 其中unsafe对象的初始化使用了模版模式,让子类去实现
1 2 3 4 5 6 7 8 9 10
| public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... ... protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } }
|
3. AbstractNioChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public abstract class AbstractNioChannel extends AbstractChannel {
... private final SelectableChannel ch; protected final int readInterestOp; volatile SelectionKey selectionKey; ...
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket.", e2); }
throw new ChannelException("Failed to enter non-blocking mode.", e); } }
}
|
4. NioSocketChannel和NioServerSocketChannel
为了方便理解,这里直接分析这两者之间的主要区别。
区别一:关心的事件不同
NioSocketChannel关心的事件是Read, 而NioServerSocketChannel关心的事件是Accept。这一点在它们的构造函数中便体现了出来。
NioServerSocketChannel
1 2 3 4 5 6 7 8 9 10
| public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { ... ... public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } ... }
|
NioSocketChannel
1 2 3 4
| protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }
|
区别二:Unsafe的实现不同
在分析服务端启动流程的时候,我们知道,在调用fireChannelActive方法的时候,实际上做了两个事情,一个是真正调用fireChannelActive方法,另一个是注册read事件,如下:
1 2 3 4 5 6
| @Override public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
|
readIfIsAutoRead会走到下面这个代码段:
1 2 3 4 5 6
| if ((interestOps & readInterestOp) == 0) { logger.info("interest ops: " + readInterestOp); selectionKey.interestOps(interestOps | readInterestOp); }
|
这时,会有个疑问,ServerSocketChannel应该只对Accept感兴趣才对啊,为什么也要对其注册Read事件呢? 奥秘就在于Unsafe中,Netty是使用Unsafe对象来实现的Channel的读写等操作, 而ServerSocketChannel和SocketChannel的Unsafe对象的实现也是不同(从上面的继承图可以看出来一个是NioByteUnsafe,一个是NioMessageUnsafe),尽管它们都对Read事件感兴趣,对于ServerSocketChannel来说,它的读实际上是读连接(accept), 而对于SocketChannel来说,它的读是真正的读数据(Bytes),下面来看看这两者Unsafe对象的实现.
NioSocketChannel
1 2 3 4
| @Override protected AbstractNioUnsafe newUnsafe() { return new NioByteUnsafe(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| protected class NioByteUnsafe extends AbstractNioUnsafe { @Override public final void read() { try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { } } while (allocHandle.continueReading()); } catch (Throwable t) { } finally { } } ... }
|
NioServerSocketChannel
1 2 3 4
| @Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>();
@Override public void read() { try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; }
allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); } finally { } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t);
try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
|
至此,大概的了解了SocketChannel和ServerSocketChannel之间的几个主要区别,并对区别的实现有了一定的认识,,不得不说,阅读优秀的框架源码总是能让人感到惊艳。
兄弟篇: Netty的两种Handler
1. ServerBootStrapAcceptor
在第一篇服务器端的启动流程 这一篇章中我们知道,ServerSocketChannel在初始化阶段,会向pipline里注册一个handler —— ServerBootStrapAcceptor, 这个handler的作用就是处理 ServerSocketChannel在使用accept获取到一个新的SocketChannel时,被使用(见上面)。 尽管上面的注释中已经简短阐述了这个handler做的事情,这里再详细介绍一下,它对SocketChannel做了哪些事情:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
... ...
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
... ... }
|
可以看到,这个ServerBootstrapAcceptor对新创建的SocketChannel做了4个事情:
- 配置pipline
- 配置option
- 设置attribute
- 进行register:
- 将这个新的SocketChannel给register到EventLoopGroup中的一个EventLoop中(使用Chooser)
- 调用获取的EventLoop的register方法,将这个新的SocketChannel给注册到selector中.
2. ChannelInitializer
我们总是使用 ChannelInitializer 去添加handler初始化pipline,但其实它自己本身也是一个handler,在channelRegistered和handlerAdd时,会被调用下面的initChannel方法(保证会被调用一次),在这个方法中,会回调我们写的initChannel方法,并且将自己(ChannelInitializer)从pipline中移除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; }
|