0%

Netty源码阅读笔记(四)Pipline

Pipline的创建

经过前面几篇的分析之后,可以知道Pipline的创建是在创建Channel的时候被创建的,在AbstractChannel的构造方法中,如下:

1
2
3
4
5
6
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

newChannelPipeline方法会返回一个DefaultPipline

1
2
3
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

下面来看看DefaultChannelPipline的具体内容:

1
2
3
4
5
6
7
8
9
10
11
12
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

// 在初始的时候创建tail,head这两个节点
tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

可以看出pipline是一个含有头节点的双向队列(和AQS底层维护的队列类似)

头节点的存在使得添加/删除一个节点都变得方便。

ChannelHandlerContext

ChannelHandlerContext

ChannelHandlerContext作为队列中的节点,AbstractChannelHandlerContext是对ChannelHandlerContext做了一个初步的实现,下面来看看ChannelHandlerContext主要继承的3个接口

AttributeMap:提供存放 attr 的功能

1
2
3
4
public interface AttributeMap {
<T> Attribute<T> attr(AttributeKey<T> key);
<T> boolean hasAttr(AttributeKey<T> key);
}

Channel(In/Out)boundInvoker

InOut

处理网络消息的一个常规的流程就是:

等待客户端的消息到来 -> 处理消息 -> 向客户端发出响应

不难看出,这是一个”被动等待 -> 处理 -> 主动发送”的一个逻辑. 而Netty中, 被动等待消息到来这一层就被抽象成InBound, 主动发送消息就是OutBound,而处理消息这一层就是InBoundOutBound中重写的ChannelRead方法了。 理解了上面的这段之后,再来看接口就很清晰了。

  1. ChannelInboundInvoker

大多数是被动等待某个事件的触发

1
2
3
4
5
6
7
8
9
10
11
public interface ChannelInboundInvoker {
ChannelInboundInvoker fireChannelRegistered();
ChannelInboundInvoker fireChannelUnregistered();
ChannelInboundInvoker fireChannelActive();
ChannelInboundInvoker fireChannelInactive();
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundInvoker fireUserEventTriggered(Object event);
ChannelInboundInvoker fireChannelRead(Object msg);
ChannelInboundInvoker fireChannelReadComplete();
ChannelInboundInvoker fireChannelWritabilityChanged();
}
  1. ChannelOutboundInvoker

大多数是主动发出的动作,比如bind, connect, write, flush等

Out

ChannelHandlerContext

下面是ChannelHandlerContext的一些关键的方法。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

/**
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
*/
Channel channel();

/**
* Returns the {@link EventExecutor} which is used to execute an arbitrary task. 例如可以是一个NioEventLoop
*/
EventExecutor executor();

/**
* The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
* was added to the {@link ChannelPipeline}. This name can also be used to access the registered
* {@link ChannelHandler} from the {@link ChannelPipeline}.
*/
String name();

/**
* The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
*/
ChannelHandler handler();

/**
* Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed
* from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
* {@link EventLoop}.
*/
boolean isRemoved();

/**
* Return the assigned {@link ChannelPipeline}
*/
ChannelPipeline pipeline();

/**
* Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
*/
ByteBufAllocator alloc();

// other func...
}

Pipline的添加/删除ChannelHandler

添加ChannelHandler

我们往往会通过重写ChannelInitializer的initChannel这个方法,来获取pipline,然后调用pipline的addLast方法来进行添加。 下面来说一下,这个addLast(ChannelHandler)方法做了哪些事情:

  1. 判断新添加的Handler能否添加进来:如果 没有标记Sharable注解 且 已经被添加过在其他的Pipline里,这种情况就是非法情况(复用Handler需要标记Sharable注解)
  2. 正式进行添加操作,将其添加到Tail节点的前面(这里就是一个双向链表的添加操作)
  3. 回调ChannelAdd(…)方法。

这里多说两句,我们通过重写ChannelInitializer的initChannel方法来添加ChannelHandler,但在上一个的篇章分析中我们知道,Channelinitilizer本身也是个ChannelHandler,在它的ChannelAdd方法中会调用initChannel方法,而在这个initChannel方法中会调用 我们重写的这个initChannel方法(这里用到了方法重载),并且把自己从pipline中移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1. 检查合法性(即是否没有标记Sharable并且被重复放到Pipline中)
checkMultiplicity(handler);

// 2. 基于ChannelHandler创建ChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);

// 3. 向队列中添加
addLast0(newCtx);

// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
// 4. 回调HandlerAdd方法!
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

删除ChannelHandler

即调用pipline的remove方法进行删除, 这一块的逻辑比较简单,就不贴代码了,它其实就是做了3步:

  1. 遍历pipline找到对应的ChannelHandlerContext节点
  2. 删除
  3. 回调ChannelRemoved方法

Pipline的事件传播

客户端和服务端通过Channel抽象进行连接,作为事件驱动的异步网络通信框架,Netty将在通信过程中的所以事件抽象成了三类:

  • InBound事件
  • OutBound事件
  • 异常事件

这三类事件会以Tail->Head或Head->Tail的方向在Pipline中传播。

同时,还需要知道的是,Pipline 和 Pipline中的节点ChannelHandlerContext都继承了ChannelInboundInvokerChannelOutboundInvoker 这两个接口,但对于Pipline而言是从Head或Tail节点开始传播, 而对于ChannelHandlerContext而言是从 当前节点的下一个节点 开始传播。

InBound事件传播

InBound事件传播的方向是从队列的 Head -> Tail的方向, 实际上本质都是ChannelHandlerContext来传播事件(Pipline虽然是从Head或Tail开始传播,但其实本质也是ChannelHandlerContext传播)。

下面来跟踪ChannelHandlerContext的InBound事件传播流程。

  1. InBound过程跟踪所使用到的Handler
1
2
3
4
5
6
7
public class ChannelInBoundA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("ChannelInBound_A : " + msg);
ctx.fireChannelRead(msg); // 向后传播
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class ChannelInBoundB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("ChannelInBound_B : " + msg);
ctx.fireChannelRead(msg); // 向后传播
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().fireChannelRead("hello, world"); // 调用pipline的传播方法,使其从Head节点开始向Tail方向传播
}
}
1
2
3
4
5
6
7
public class ChannelInBoundC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("ChannelInBound_C : " + msg);
ctx.fireChannelRead(msg); // 向后传播
}
}
  1. 从Pipline的传播方法进入,可以看到是从Head开始的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    ...
    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
    // 这里的第一个参数,就是ChannelHandlerContext变量, 可以看到这里传的是head
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
    }
    }
  2. 层层跟进后,最终在ChannelHandlerContext中会调用到下面这个方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
    try {
    // 通过handler()获取到当前ChannelHandlerContext绑定的ChannelHandler,
    // 然后调用事件方法(这里是channelRead事件) 而这个ChannelHandler
    // 就是我们addLast中放入的Handler,调用的方法就是我们重写的方法(channlRead(...))
    ((ChannelInboundHandler) handler()).channelRead(this, msg);
    } catch (Throwable t) {
    notifyHandlerException(t);
    }
    } else {
    fireChannelRead(msg);
    }
    }
  3. 对于Head而言,channelRead方法中只有一行继续向后传播的代码

    1
    2
    3
    4
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.fireChannelRead(msg); // 什么也不做,向后传播
    }
  4. 从ChannelHandlerContext进入事件传播方法(fireXxx),就进入了下面这个代码段

    1
    2
    3
    4
    5
    6
    7
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
    // 与从Pipline进入fireXxx方法不同之处在于, 第一个参数不再是队列的Head节点
    // 而是通过findContextInbound(...) 方法获取的一个ChannelHandlerContext节点
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
    }
1
2
3
4
5
6
7
8
// 这个方法的作用就是,获取下一个InBound类型的ChannelHandlerContext
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}

获取到下一个ChannelHandlerContext之后,后面的流程就都一样了,简单来说就是:

  • 从Pipline开始传播 读事件 == 从Head开始传播 读事件
  • 调用当前节点的channelRead方法
  • 获取下一个InBound节点
  • 调用当前节点(已经是下一个节点了)的channelRead方法
  • 获取下一个InBound节点
    ….
  • 获取到下一个InBound节点(此节点为Tail)
  • 调用Tail节点的channelRead方法(按理说不应该调用到这个位置,如果真调用到这个位置说明这个事件没有被handler处理掉,因此会打印warn日志,并且释放掉资源。)

OutBound事件传播

其主要思想和InBound传播的思想类似, 不过主要有2个地方不一样:

  1. OutBound事件传播方向是从Tail -> Head (在addLast(Handler)的时候要注意顺序!)
  2. Pipline的fireXxx是从Tail节点开始,向Head方向传播, 当传播到Head节点的时候,会基于Unsafe对象来处理最终的出站操作。

异常事件传播

异常事件的传播也是与InBound和OutBound的类似,但不同的地方在于:

  • 顺序是 触发异常的当前节点 -> Tail
  • 不管是InBound节点还是OutBound节点,都会进入…(不像InBound传播只会进入InBound,OutBound只会进入OutBound)
1
2
3
4
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}

异常事件传播的最佳实践

在添加ChannelHandler的时候,可以在最后添加一个Handler专门处理异常

1
2
3
4
5
6
7
8
9
10
public class ExceptionChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if ( cause instanceof RuntimeException ){
// process
}else if ( ... ){
...
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelInBoundA());
pipeline.addLast(new ChannelInBoundB());
pipeline.addLast(new ChannelInBoundC());
pipeline.addLast(new ChannelOutBoundA());
pipeline.addLast(new ChannelOutBoundB());
pipeline.addLast(new ChannelOutBoundC());
// 在最后添加一个专门处理各类异常的Handler
pipeline.addLast(new ExceptionChannelHandler());
}
})

SimpleChannelInboundHandler

我们知道,在传递channelRead事件的时候,即使我们没有手动把资源释放(例如从池中获取的ByteBuf使用完后没释放),如果传递到Tail节点的话,其内部也会自动的帮我们把资源释放掉。。。 但是,如果我们既没有释放掉资源,并且这个事件也没有传递到Tail节点的话,那么就会发生资源泄漏问题。。。

而SimpleChannelInboundHandler就是对ChannelInboundHandlerAdapter的一层封装,解决了这一问题。

除了解决上面这一问题的同时,而SimpleChannelInboundHandler还提供了 基于泛型的方式来进入Handler的功能。(例如: 如果HandlerA的类型是SimpleChannelInboundHandler<byteBuf>类型,那么在channelRead事件传播的时候,只有传递的数据类型为byteBuf类型,才能够进入这个Handler.. 这一功能用起来也十分方便。。)

下面来看一下SimpleChannelInboundHandler<I>是怎么实现这两个功能的。

SimpleChannelInboundHandler<I> , 它是对ChannelInboundHandlerAdapter的包装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {

...
private final TypeParameterMatcher matcher;
...

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
// 用于判断这个msg是否是 <I> 这个范型类型的, 如果是,则返回true,调用channelRead0
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg); //模版模式
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
// 在finally中,只要调用完channelRead0之后没有释放资源,都会进行资源的释放
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}

protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;

}