0%

Netty源码阅读笔记(二)NioEventLoop

NioEventLoopGroup和NioEventLoop的创建

NioEventLoop的创建是在创建EventLoopGroup中进行的,因此,先来快速过一遍EventLoopGroup的创建.

EventLoopGroup的创建中,主要是做了3件事:

  1. 创建ThreadPerTaskExcutor
  2. 创建EventLoop
  3. 创建Chooser
1
2
3
4
// 对于无参构造器,默认创建 2*cpu 个NioEventLoop
public NioEventLoopGroup() {
this(0);
}

经过多个构造器之后,到达MultithreadEventExecutorGroup, 这是NioEventLoopGroup的基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
...
...
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 1. 创建Executor
if (executor == null) {
// 对于execute的每一个task,都会创建一个新的线程去执行它
// 并且创建的线程是使用同一个线程工厂来创建的,有着同一个命名格式:nioEventLoop-x-yy
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];

// 2. 创建EventLoop数组
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// 抛异常
} finally {
// 优雅的关闭
}
}
// 3. 获取chooser
chooser = chooserFactory.newChooser(children);
....
....

}
...
...
}

1. 创建Executor——ThreadPerTaskExecutor

ThreadPerTaskExecutor的作用就和它的名字一样,每提交一个任务都新创建一个线程去做。

ThreadPerTaskExecutor除了实现上面这个功能以外,还使用一个专用的ThreadFactory来去创建线程,这就保证ThreadPerTaskExecutor创建的所有线程都是以“nioEventLoopGroup-x-yy”的格式来命名的。

2. 创建EventLoop数组

这里就是根据传入的nThread参数,来创建一个长度为n的EventLoop数组,然后去逐个调用newChild(executor, args)方法去创建EventLoop实例。

做了3个重要的事: 初始化selector,初始化executor,初始化taskQueue

1
2
3
4
5
6
7
8
9
10
11
12
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
...
...

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// 在这里创建NioEventLoop!
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}

下面来看看NioEventLoop的构造方法:

初始化Selector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
// 继续调用父类构造器
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);

// other code...

/* 给成员变量赋值 */
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();

// 重点!!赋值selector
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;

selectStrategy = strategy;
}

注意到,在这里selector被初始化了(就是被赋值了)

往下再跟进几层

1
2
3
4
5
6
7
8
9
10
11
12
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
// 重点!! 初始化executor
this.executor = ThreadExecutorMap.apply(executor, this);
// 重点!! 初始化taskQueue
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

初始化Executor

需要注意的是,这里的executor是对ThreadPerTaskExecutor使用了装饰者模式,即外面又包装了一层,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// ThreadExecutorMap中

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
// check null code ...
return new Runnable() {
@Override
public void run() {
// 多装饰的方法,看名字就知道意思了
setCurrentEventExecutor(eventExecutor); // 这里的eventExecutor就是NioEventLoop
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}

初始化TaskQueue

这里的TaskQueue就是存放任务的队列,在外部线程想要执行netty任务的时候(就是非EventLoop的Thread想要执行netty任务的时候), 这个任务应该被EventLoop去执行(netty任务不应该被外部线程执行),从而保证串行无锁化,这时,就把这个task放到这个taskQueue中,之后会被NioEventLoop给执行掉(在其run方法中的runAllTasks里)

这里的TaskQueue是由MpscQueue实现的,因为这个taskQueue被使用的频率很高,所以对队列的实现要求也很高,之后有时间再好好看一下其实现原理。

1
2
3
public static <T> Queue<T> newMpscQueue() {
return Mpsc.newMpscQueue();
}
1
2
3
4
static <T> Queue<T> newMpscQueue() {
return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
: new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
}

3. 创建Chooser

Chooser的作用就是,当有新的Channel建立的时候,选择应该将其放到哪一个EventLoop下面… 实质就是对EventLoop数组的下表的选择。netty是使用负载轮训的方式来做的,不过这里有一个优化,就是如果EventLoop数组的长度是2的整数次方的话,就能使用 len-1获取到mask,然后直接 & 当前游标就能做到 取模的操作, 位运算的速度会比较快。

这里我们可以看到,NioEventLoop直到创建结束都没有创建它所应该持有的那个唯一线程,这个操作实际上是放在NioEventLoop启动的时候做的

NioEventLoop的启动

实际上NioEventLoop是以懒加载的形式启动的,也就是在首次执行任务的时候,如果这个工作线程还没有被创建,NioEventLoop还没被启动,那么就会使用executor(ThreadPerTaskExecutor)来去创建,并且启动NioEventLoop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// NioEventLoop的基类——SingleThreadEventExecutor
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
...
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 当前线程是否是EventLoop中的工作线程(因为初始调用时,工作线程为null,所以必为false)
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
// 重点!! 下面这个方法做的事是:如果这个工作线程还没有被创建,那么就是使用executor去创建
startThread();

// other code...

}
// other code...
}

}

在startThread进入几层后,到了真正干事的doStartThread方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建NioEventLoop的工作线程,并启动EventLoop
private void doStartThread() {
assert thread == null;
// executor就是那个ThreadPerTaskExecutor
// 在执行execute时就已经创建了线程
executor.execute(new Runnable() {
@Override
public void run() {
// 1. 将当前创建的线程置为当前EventLoop的工作线程
thread = Thread.currentThread();
// other code
try {
// 2. 启动EventLoop!!启动完之后就开始进行select了,下一部分分析它!!!
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
// print warn log...
} finally {
// other code...
}
}
});
}

NioEventLoop的run方法解析

本次解析主要看for(;;)里的3个关键方法:

  • select
  • 当有事件到来时,进行processSelectionKeys操作
  • 执行taskQueue中的任务,runAllTasks操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public final class NioEventLoop extends SingleThreadEventLoop {
...
...
@Override
//死循环监听、处理事件
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
// 1. 进行select操作
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// other code ...
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 2. 处理事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 3. 执行taskQueue里的任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// 2. 处理事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 3. 执行taskQueue里的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
// other code ...
}
}
}

1. select操作 (检测事件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// selectCnt记录轮询次数, 空轮询次数超过SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)之后,
// 则重建selector
int selectCnt = 0;
// 记录当前事件
long currentTimeNanos = System.nanoTime();
// selectDeadLineNanos = 当前时间 + 距离最早的定时任务开始执行的时间
// 计算出select操作必须在哪个时间点之前被wakeUp (不然一直被阻塞的话,定时任务就没发被执行)
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
}

for (;;) {
// 计算出当前select操作能阻塞的最久时间
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 超过最长等待时间:有定时task需要执行
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
//非阻塞,没有数据返回0
selector.selectNow();
selectCnt = 1;
}
break;
}

// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// 确定当前确实没有任务需要去执行
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}

// 进行select操作, 下面select阻塞中,别人唤醒也可以可以的
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}

// 如果select没有触发超时返回,并且确实是监听到了新事件而不是空轮询,那么就一定会在上面的if中返回了
// 所以往下走的话,有2个情况:
// 1. select超时
// 2. 发生了空轮询

if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}


long time = System.nanoTime();
// select超时的情况(因为实际经过的时间确实是 >= 应该最大阻塞时间 )
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 空轮询次数超过了 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.

// 重建selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}

currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}

上面的代码注释已经写的很详细,这里再简短的小结一下select的流程:

  1. 确保可以进行阻塞式select,即保证没有任务需要执行(包括定时任务和非定时任务, 定时任务和非定时任务存在2个不同的队列里,后面会说)
    1. 根据最早定时任务应执行的时间,来计算出select的阻塞最大超时时间是多久(即select阻塞操作必须保证不能过分影响定时任务的执行)
    2. 判断定时任务是否应该执行(最大超时时间<=0), 若是则执行一个selectNow()并返回(非阻塞select)
    3. 判断taskQueue中是否有任务需要执行,若有则执行一个selectNow()并返回(非阻塞select)
  2. 进行阻塞式select(timeout)
  3. 判断是否出现空轮询bug, 若出现, 则重建selector。 默认情况是空轮询超过512次,则触发重建selector操作。

空轮询Bug及出现原因

2. processSelectedKey操作(处理事件)

2.1 优化一:以数组代替Set的方式,存储SelectedKeys

当有新事件到来之后,就需要处理这些事件. 因为都是基于NIO来做的,所以这里在处理新事件的时候(SelectionKey),其实也是使用的NIO里的东西,只不过Netty又在其基础上做了优化,比如:SelectionKeys的存储方法,用数组来代替集合,使得add操作变成O(1)

为什么能用数组代替集合

因为selectedKeySet并不会用到remove,contains这种操作,而且高频操作主要是:add和遍历… 所以没有必要使用Set, 使用数组完全足够

如何更改NIO中的SelectedKeySet

首先要知道,在NIO中的SelectedKeySet属性的类型是Set类型,因此,只要继承Set接口,重新自己实现一个Set,然后内部的实现方式使用数组来实现,最终再将自己实现的Set类型以反射的形式注入到Selector中, 这样就能够做到在不更改Selector的情况下,使得SelectionKey的存储方式从Set变为数组,从而提升了效率。

Netty实际上就是这么做的,因为主要是对selector进行的更改(反射注入),而Selector的初始化是在NioEventLoop中,所以我们来看一下这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
// other code ...
provider = selectorProvider;
// 在这里对selector初始化!!!!
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 1. 创建一个Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

// 如果不需要对KeySet进行优化,那么就直接返回创建的这个Selector
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}

// 获取SelectorImpl的类对象
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});

// 确保刚才创建的selector是SelectorImpl的实现类的对象!! (如果不是也不能进行优化,就直接返回
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}

final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

// 2. 偷梁换柱!! 创建 "基于数组" 实现的Set —— SelectedSelectionKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

// other condition check
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// ...
}

// setAccessible(true)
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}

// 3. 使用基于数组实现的Set 反射注入!!!
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
// other code ...
}

而上面的基于数组实现的Set,是下面这个样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

SelectionKey[] keys;
int size;

SelectedSelectionKeySet() {
// 提前初始化好一个长度为1024的数组,之后如果超过这个大小的话,会触发 increaseCapacity(),每次长度*2
keys = new SelectionKey[1024];
}

@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}

keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}

return true;
}

/**
* 用不到这个方法,所以直接返回false
*/
@Override
public boolean remove(Object o) {
return false;
}

/**
* 用不到这个方法,所以直接返回false
*/
@Override
public boolean contains(Object o) {
return false;
}

@Override
public int size() {
return size;
}

@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;

@Override
public boolean hasNext() {
return idx < size;
}

@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}

void reset() {
reset(0);
}

void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}

private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}

2.2 processSelectedKeys

这里的优化主要就是:

  1. help GC
  2. 从SelectionKey的attachment中拿出Netty自己实现的Channel,然后后面的都是基于这个Channel来做处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 1. help GC
selectedKeys.keys[i] = null;

// 呼应于channel的register中的this: 例如:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
// 2. SelectionKey中的attachment,存放的是Netty包装实现的Channel,这样的话后面都基于Netty的Channel来做了 (AbstractNioChannel)
final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
// 基于Netty的Channel来处理,里面其实和NIO类似,确定发生的事件之后,然后对其进行处理...
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}

3. runAllTask操作

经过上面的分析后,我们知道,在Netty自己实现的select方法中,除了事件到来的情况以外,当出现“有定时/非定时任务”存在的时候,也会wakeUp起来… 出来之后会先处理seletedKey的情况,然后就是runAllTask的情况…

3.1 Netty的定时任务和非定时任务

  • 非定时任务保存在taskQueue中,在上面有提到过,为了保证异步串行无锁化,所有外部线程想执行netty任务的时候,都需要把需要执行的task给放到taskQueue中,让NioEventLoop去执行 (在执行任务的时候,会去判断,当前线程是否是EventLoop里的线程,即inEventLoop()方法)
  • 定时任务,即在将来的某个时间点开始执行的任务,这个任务保存在scheduleTaskQueue中,scheduleTaskQueue是一个优先队列, 其排序方式是按照时间排序,最早应该执行的任务放在堆顶,对于执行时间相同的Task,则按照任务存入的顺序来排序
    1
    2
    3
    4
    5
    6
    7
    8
    9
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
    scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
    SCHEDULED_FUTURE_TASK_COMPARATOR,
    // Use same initial capacity as java.util.PriorityQueue
    11);
    }
    return scheduledTaskQueue;
    }
    ScheduledFutureTask<?>的compareTo方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Override
    public int compareTo(Delayed o) {
    if (this == o) {
    return 0;
    }

    ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
    long d = deadlineNanos() - that.deadlineNanos();
    // deadlineNanos小的放在堆顶
    // 排序方式是越早执行的,越是在前面
    if (d < 0) {
    return -1;
    } else if (d > 0) {
    return 1;
    } else if (id < that.id) {
    // 若执行时刻相同,则按照任务入队顺序排序,即按照id排序
    return -1;
    } else {
    assert id != that.id;
    return 1;
    }
    }

3.2 两种任务的添加方式

  • 对于非定时任务,就使用NioEventLoop的execute(…)方法,在方法内部会有一个addTask方法,即将任务入队
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Override
    public void execute(Runnable task) {
    if (task == null) {
    throw new NullPointerException("task");
    }
    boolean inEventLoop = inEventLoop();
    // 任务入队
    addTask(task);
    if (!inEventLoop) {
    // 若NioEventLoop的线程还没有启动,则去启动
    startThread();
    // other code...
    }
    // other code ...
    }
  • 对于定时任务,实际上是将“添加定时任务”这一任务,当作一个非定时任务存到taskQueue中去做的, 这是因为非定时任务列队(scheduleTaskQueue)是一个线程不安全的容器,利用eventLoop串行无锁化的方式,来保证线程安全.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
    scheduledTaskQueue().add(task);
    } else {
    // 将"添加定时任务"这一任务放到taskQueue中去做
    executeScheduledRunnable(new Runnable() {
    @Override
    public void run() {
    scheduledTaskQueue().add(task);
    }
    }, true, task.deadlineNanos());
    }
    return task;
    }

3.3 两种任务的执行

这两种任务都是在runAllTask方法中被执行的,执行方式就是,先将scheduleTaskQueue中需要执行的Task放到taskQueue中,然后再执行taskQueue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;

do {
// 转移:应该执行的scheduledTask 放入 taskQueue
fetchedAll = fetchFromScheduledTaskQueue(); // 返回的boolean代表是否停止向taskQueue中放任务, 当且仅当:taskQueue满了 或 没有scheduleTask需要放入到taskQueue中的时候,才返回true

// 依次执行taskQueue中的任务
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}