Pipline的创建
经过前面几篇的分析之后,可以知道Pipline的创建是在创建Channel的时候被创建的,在AbstractChannel
的构造方法中,如下:
1 | protected AbstractChannel(Channel parent) { |
newChannelPipeline方法会返回一个DefaultPipline
1 | protected DefaultChannelPipeline newChannelPipeline() { |
下面来看看DefaultChannelPipline的具体内容:
1 | protected DefaultChannelPipeline(Channel channel) { |
可以看出pipline是一个含有头节点的双向队列(和AQS底层维护的队列类似)
头节点的存在使得添加/删除一个节点都变得方便。
ChannelHandlerContext
ChannelHandlerContext作为队列中的节点,AbstractChannelHandlerContext是对ChannelHandlerContext做了一个初步的实现,下面来看看ChannelHandlerContext主要继承的3个接口
AttributeMap:提供存放 attr 的功能
1 | public interface AttributeMap { |
Channel(In/Out)boundInvoker
处理网络消息的一个常规的流程就是:
等待客户端的消息到来 -> 处理消息 -> 向客户端发出响应
不难看出,这是一个”被动等待 -> 处理 -> 主动发送”的一个逻辑. 而Netty中, 被动等待消息到来这一层就被抽象成InBound
, 主动发送消息就是OutBound
,而处理消息这一层就是InBound
和OutBound
中重写的ChannelRead
方法了。 理解了上面的这段之后,再来看接口就很清晰了。
- ChannelInboundInvoker
大多数是被动等待某个事件的触发
1 | public interface ChannelInboundInvoker { |
- ChannelOutboundInvoker
大多数是主动发出的动作,比如bind, connect, write, flush等
ChannelHandlerContext
下面是ChannelHandlerContext的一些关键的方法。。。
1 | public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { |
Pipline的添加/删除ChannelHandler
添加ChannelHandler
我们往往会通过重写ChannelInitializer的initChannel这个方法,来获取pipline,然后调用pipline的addLast方法来进行添加。 下面来说一下,这个addLast(ChannelHandler)方法做了哪些事情:
- 判断新添加的Handler能否添加进来:如果 没有标记Sharable注解 且 已经被添加过在其他的Pipline里,这种情况就是非法情况(复用Handler需要标记Sharable注解)
- 正式进行添加操作,将其添加到Tail节点的前面(这里就是一个双向链表的添加操作)
- 回调ChannelAdd(…)方法。
这里多说两句,我们通过重写ChannelInitializer的initChannel方法来添加ChannelHandler,但在上一个的篇章分析中我们知道,Channelinitilizer本身也是个ChannelHandler,在它的ChannelAdd方法中会调用initChannel方法,而在这个initChannel方法中会调用 我们重写的这个initChannel方法(这里用到了方法重载),并且把自己从pipline中移除。
1 |
|
删除ChannelHandler
即调用pipline的remove方法进行删除, 这一块的逻辑比较简单,就不贴代码了,它其实就是做了3步:
- 遍历pipline找到对应的ChannelHandlerContext节点
- 删除
- 回调ChannelRemoved方法
Pipline的事件传播
客户端和服务端通过Channel抽象进行连接,作为事件驱动的异步网络通信框架,Netty将在通信过程中的所以事件抽象成了三类:
- InBound事件
- OutBound事件
- 异常事件
这三类事件会以Tail->Head或Head->Tail的方向在Pipline中传播。
同时,还需要知道的是,Pipline 和 Pipline中的节点ChannelHandlerContext都继承了ChannelInboundInvoker
和 ChannelOutboundInvoker
这两个接口,但对于Pipline而言是从Head或Tail节点开始传播, 而对于ChannelHandlerContext而言是从 当前节点的下一个节点 开始传播。
InBound事件传播
InBound事件传播的方向是从队列的 Head -> Tail
的方向, 实际上本质都是ChannelHandlerContext来传播事件(Pipline虽然是从Head或Tail开始传播,但其实本质也是ChannelHandlerContext传播)。
下面来跟踪ChannelHandlerContext的InBound事件传播流程。
- InBound过程跟踪所使用到的Handler
1 | public class ChannelInBoundA extends ChannelInboundHandlerAdapter { |
1 | public class ChannelInBoundB extends ChannelInboundHandlerAdapter { |
1 | public class ChannelInBoundC extends ChannelInboundHandlerAdapter { |
从Pipline的传播方法进入,可以看到是从Head开始的
1
2
3
4
5
6
7
8
9
10public class DefaultChannelPipeline implements ChannelPipeline {
...
...
public final ChannelPipeline fireChannelRead(Object msg) {
// 这里的第一个参数,就是ChannelHandlerContext变量, 可以看到这里传的是head
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
}层层跟进后,最终在ChannelHandlerContext中会调用到下面这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 通过handler()获取到当前ChannelHandlerContext绑定的ChannelHandler,
// 然后调用事件方法(这里是channelRead事件) 而这个ChannelHandler
// 就是我们addLast中放入的Handler,调用的方法就是我们重写的方法(channlRead(...))
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}对于
Head
而言,channelRead方法中只有一行继续向后传播的代码1
2
3
4
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg); // 什么也不做,向后传播
}从ChannelHandlerContext进入事件传播方法(fireXxx),就进入了下面这个代码段
1
2
3
4
5
6
7
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 与从Pipline进入fireXxx方法不同之处在于, 第一个参数不再是队列的Head节点
// 而是通过findContextInbound(...) 方法获取的一个ChannelHandlerContext节点
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
1 | // 这个方法的作用就是,获取下一个InBound类型的ChannelHandlerContext |
获取到下一个ChannelHandlerContext之后,后面的流程就都一样了,简单来说就是:
- 从Pipline开始传播 读事件 == 从Head开始传播 读事件
- 调用当前节点的channelRead方法
- 获取下一个InBound节点
- 调用当前节点(已经是下一个节点了)的channelRead方法
- 获取下一个InBound节点
…. - 获取到下一个InBound节点(此节点为Tail)
- 调用Tail节点的channelRead方法(按理说不应该调用到这个位置,如果真调用到这个位置说明这个事件没有被handler处理掉,因此会打印warn日志,并且释放掉资源。)
OutBound事件传播
其主要思想和InBound传播的思想类似, 不过主要有2个地方不一样:
- OutBound事件传播方向是从Tail -> Head (在addLast(Handler)的时候要注意顺序!)
- Pipline的fireXxx是从Tail节点开始,向Head方向传播, 当传播到Head节点的时候,会基于Unsafe对象来处理最终的出站操作。
异常事件传播
异常事件的传播也是与InBound和OutBound的类似,但不同的地方在于:
- 顺序是 触发异常的当前节点 -> Tail
- 不管是InBound节点还是OutBound节点,都会进入…(不像InBound传播只会进入InBound,OutBound只会进入OutBound)
1 |
|
异常事件传播的最佳实践
在添加ChannelHandler的时候,可以在最后添加一个Handler专门处理异常
1 | public class ExceptionChannelHandler extends ChannelInboundHandlerAdapter { |
1 | b.group(boss, worker) |
SimpleChannelInboundHandler
我们知道,在传递channelRead事件的时候,即使我们没有手动把资源释放(例如从池中获取的ByteBuf使用完后没释放),如果传递到Tail节点的话,其内部也会自动的帮我们把资源释放掉。。。 但是,如果我们既没有释放掉资源,并且这个事件也没有传递到Tail节点的话,那么就会发生资源泄漏问题。。。
而SimpleChannelInboundHandler就是对ChannelInboundHandlerAdapter的一层封装,解决了这一问题。
除了解决上面这一问题的同时,而SimpleChannelInboundHandler还提供了 基于泛型的方式来进入Handler的功能。(例如: 如果HandlerA的类型是SimpleChannelInboundHandler<byteBuf>
类型,那么在channelRead事件传播的时候,只有传递的数据类型为byteBuf类型,才能够进入这个Handler.. 这一功能用起来也十分方便。。)
下面来看一下SimpleChannelInboundHandler<I>
是怎么实现这两个功能的。
SimpleChannelInboundHandler<I>
, 它是对ChannelInboundHandlerAdapter
的包装:
1 | public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter { |