long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime(); if (nextWakeupTime != normalizedDeadlineNanos) { nextWakeupTime = normalizedDeadlineNanos; }
for (;;) { // 计算出当前select操作能阻塞的最久时间 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // 超过最长等待时间:有定时task需要执行 if (timeoutMillis <= 0) { if (selectCnt == 0) { //非阻塞,没有数据返回0 selector.selectNow(); selectCnt = 1; } break; }
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we don't, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. // 确定当前确实没有任务需要去执行 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; }
// 进行select操作, 下面select阻塞中,别人唤醒也可以可以的 int selectedKeys = selector.select(timeoutMillis); selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; }
if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; }
long time = System.nanoTime(); // select超时的情况(因为实际经过的时间确实是 >= 应该最大阻塞时间 ) if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } elseif (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 空轮询次数超过了 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) // The code exists in an extra method to ensure the method is not too big to inline as this // branch is not very likely to get hit very frequently.
// 确保刚才创建的selector是SelectorImpl的实现类的对象!! (如果不是也不能进行优化,就直接返回 if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } returnnew SelectorTuple(unwrappedSelector); }
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; // 2. 偷梁换柱!! 创建 "基于数组" 实现的Set —— SelectedSelectionKeySet final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run(){ try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
// other condition check if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // ... }
// setAccessible(true) Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; }
privatevoidprocessSelectedKeysOptimized(){ for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 // 1. help GC selectedKeys.keys[i] = null;
// 呼应于channel的register中的this: 例如:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // 2. SelectionKey中的attachment,存放的是Netty包装实现的Channel,这样的话后面都基于Netty的Channel来做了 (AbstractNioChannel) final Object a = k.attachment();
if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) { scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>( SCHEDULED_FUTURE_TASK_COMPARATOR, // Use same initial capacity as java.util.PriorityQueue 11); } return scheduledTaskQueue; }
// 依次执行taskQueue中的任务 if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.