0%

Netty基本组件

一、前言

在基于netty做了几个小项目之后,越发的觉得还是有必要看一下源码的,不然用起来还是有些不踏实,故近期打算每天抽一点时间阅读下其源码并做一下学习笔记。

二、Netty基本组件

Java自带的NIO是对多路复用Reactor模型的实现,但使用起来不是特别方便,因此Netty就在NIO基础上又进行了一定程度的封装与优化,新增了很多NIO不具备的特性,使用起来较为方便,同时性能也很好。 Netty的几个基本组件,有些也是在NIO已有的基础上进行的封装(如Channel,ByteBuf)。 下面是几个是netty的主要组件,这里只是做了一下简述,之后会一个个地剖析其源码。

1. Channel

netty的channel是对socket连接的抽象,并且它是基于Java的NIO来实现的,底层干活的实际上还是java的NIO。

2. ByteBuf

I/O 与 NIO 最重要的区别是数据打包和传输的方式,I/O 以流的方式处理数据,而 NIO 以块的方式处理数据。

在NIO中也有ByteBuf组件,但不是很好用,切换读写模式时还需要进行flip(), clear(), 因此Netty在其基础上进行了优化,不仅拥有更丰富的api,使用起来非常方便,同时也通过视图模式,直接内存等手段减少数据复制次数,性能上也很优秀。

3. Pipline

pipline

一个Channel 包含了一个ChannelPipeline , 而ChannelPipeline 中又维护了一个由ChannelHandlerContext 组成的双向链表。

4. ChannelHandler

ChannelHandler是构成Pipline的主要成员,主要用于处理业务,如拆包,编解码,具体的业务逻辑等等…

5. NioEventLoop

当通过selector获取了selectionKey之后,那么触发事件后所发生的具体的业务逻辑该由谁来做呢? 在Netty中是由NioEventLoop来做的,其内部持有一个thread对象,下面是NioEventLoop的声明:

1
2
3
4
// 继承了SingleThreadEventLoop可以看出,NioEventLoop是单线程的,这保证了在同一个NioEventLoop中的Channel们的线程安全
public final class NioEventLoop extends SingleThreadEventLoop {
...
}

NIO服务端例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class NIOServer {

public static void main(String[] args) throws IOException {

Selector selector = Selector.open();

// 得到一个ServerSocketChannel
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 因为是多路复用,selector要去轮询它监听的channel们,如果是阻塞模式的话,就会阻塞在这个channel处,即使后面的channel的事件发生了,也没发被发现,因此,被监听的每一个channel都必须设置为非阻塞
ssChannel.configureBlocking(false);

// 将该channel注册到selector中,并绑定感兴趣的事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

// 绑定到本地的一个地址,开始监听这个地址
ServerSocket serverSocket = ssChannel.socket();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
serverSocket.bind(address);

while (true) {
// 若没有事件发生就阻塞在这个位置,当有事件来的时候才会往下走
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();

while (keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if (key.isAcceptable()) {

ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();

// 服务器会为每个新连接创建一个 SocketChannel
// 因为已经触发过Accept事件了,因此下面的这个accept肯定不会阻塞的
SocketChannel sChannel = ssChannel1.accept();

// 同样的,将新建的与客户端连接的channel设置为非阻塞,然后注册到selector中
sChannel.configureBlocking(false);

// 这个新连接主要用于从客户端读取数据
sChannel.register(selector, SelectionKey.OP_READ);

} else if (key.isReadable()) {

SocketChannel sChannel = (SocketChannel) key.channel();
System.out.println(readDataFromSocketChannel(sChannel));
sChannel.close();
}

keyIterator.remove();
}
}
}

private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {

ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder data = new StringBuilder();

while (true) {

buffer.clear();
int n = sChannel.read(buffer);
if (n == -1) {
break;
}
buffer.flip();
int limit = buffer.limit();
char[] dst = new char[limit];
for (int i = 0; i < limit; i++) {
dst[i] = (char) buffer.get(i);
}
data.append(dst);
buffer.clear();
}
return data.toString();
}
}

注释已经写的很详细,可以看到,当触发Accept事件的时候,ServerSocketChannel使用accept方法与客户端建立一个新的SocketChannel连接,然后ServerSocketChannel继续Loop监听Accept事件, 在Netty中其实也是这样的逻辑,下面来看一下.


Netty基于NIO实现Selector

从NioEventLoop的run方法中来看Netty是怎么实现Selector的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// 继承了SingleThreadEventLoop可以看出,NioEventLoop是单线程的,这保证了在同一个NioEventLoop中的Channel们的线程安全
public final class NioEventLoop extends SingleThreadEventLoop {
// others ....
// ....
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
// 1. 在此进行select, 这里的select是基于NIO的selector来做的
select(wakenUp.getAndSet(false));

// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).

if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 2. 处理SelectedKeys
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// 2. 处理SelectedKeys
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
// others...
// ...
}

上面的select(…)方法中,里面实际就是使用NIO的selector来进行的select.

接下来是处理SelectedKeys

1
2
3
4
5
6
7
8
9
 private void processSelectedKeys() {
if (selectedKeys != null) {
// 不用JDK的selector.selectedKeys(), 性能更好(1%-2%)
// 垃圾回收更少
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
selectedKeys.keys[i] = null;

//呼应于channel的register中的this: 例如:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// others...
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//处理读请求(断开连接)或接入连接
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read(); // 最底层使用unsafe方法来操作
}

// others ...
// ....
}

看完后其实能发现,Netty的Selector完全是基于NIO来做的,只是再其基础上又新加了许多功能和优化。