linux 2.6以下 及 unix 系统的 Selector 是由 PollSelectorImpl 类来实现的。
创建一个 PollSelectorImpl 的时候,创建了什么
创建一个 PollSelectorImpl 的时候,就是创建了一个 PollArrayWrapper 对象。用来存储 pollfd.
PollArray 的结构
1 2 3 4 5 6 7 8 9 10 11 12 13
| struct pollfd { int fd; short events; short revents; };
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ |fd e| | | | | | | | | | | +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
PollSelectorImpl 类持有一个 PollArray 数组的实例。其默认大小是10. 这个数组的第一个元素是管道的句柄。
1 2 3 4 5 6 7 8 9 10 11 12
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new PollArrayWrapper(INIT_CAP);
pollWrapper.initInterrupt(fd0, fd1);
register 一个 SocketChannel 的时候,发生了什么
其实就是将 SocketChannel 保存到上面的结构中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); pollWrapper.poll(totalChannels, 0, timeout); } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.getReventOps(0) != 0) { pollWrapper.putReventOps(0, 0); synchronized (interruptLock) { IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; }
protected int updateSelectedKeys() { int numKeysUpdated = 0; for (int i=channelOffset; i<totalChannels; i++) { int rOps = pollWrapper.getReventOps(i); if (rOps != 0) { SelectionKeyImpl sk = channelArray[i]; pollWrapper.putReventOps(i, 0); if (selectedKeys.contains(sk)) { if (sk.channel.translateAndSetReadyOps(rOps, sk)) { numKeysUpdated++; } } else { sk.channel.translateAndSetReadyOps(rOps, sk); if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { selectedKeys.add(sk); numKeysUpdated++; } } } } return numKeysUpdated; }
调用 Selector 的 select 方法的时候,发生了什么
Selector.select 方法最终调用系统的 poll 方法,进行等待状态。
Copy the information in the pollfd structs
into the opss of the corresponding Channels.
设置好 SelectionKey 的状态变量,然后这几个查询方法就可以使用了:isAcceptable,isConnectable,isReadable,isWritable,readyOps
通过这些方法,就可以对 SelectionKey 关联的 channel 进行读写操作了。
Add the ready keys to the ready queue.
这个 ready set 由 Selector.selectedKeys 方法,可以获得
这个个数由 select 方法返回。
Selctor 的唤醒 wakeup
将导致 Selector 方法的 select 方法直接返回。
因为 select 操作会在其所关注的 fd 上发生感兴趣的事件的时候返回。所以可以在调用 select 的时候,向其注册一个pipe, 需要唤醒这个 selector 操作的时候,就可以向这个 pipe 中写入数据。此时pipe就会被唤醒。
所以在 pollWrapper 的第一个元素默认是 pipe.
1 2 3 4 5 6 7 8 9 10 11 12 13
| public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { pollWrapper.interrupt(); interruptTriggered = true; } } return this; }
Selctor 的关闭 close
下面分析 Selector 的关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| private AtomicBoolean selectorOpen = new AtomicBoolean(true); public final void close() throws IOException { boolean open = selectorOpen.getAndSet(false); if (!open) return; implCloseSelector(); }
protected void implClose() throws IOException { synchronized (closeLock) { if (closed) return; closed = true; for(int i=channelOffset; i<totalChannels; i++) { SelectionKeyImpl ski = channelArray[i]; assert(ski.getIndex() != -1); ski.setIndex(-1); deregister(ski); SelectableChannel selch = channelArray[i].channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); } implCloseInterrupt(); pollWrapper.free(); pollWrapper = null; selectedKeys = null; channelArray = null; totalChannels = 0; } }
protected void implCloseInterrupt() throws IOException { synchronized (interruptLock) { interruptTriggered = true; } FileDispatcherImpl.closeIntFD(fd0); FileDispatcherImpl.closeIntFD(fd1); fd0 = -1; fd1 = -1; pollWrapper.release(0); } `
注册在 Selector 上的 Channel 也可以主动关闭:
Channel 在关闭的时候,会在将其所注册的所有 Selector 上执行 cancel 操作。将 Selector 中与待关闭的 Channel 相关的数据进行清除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| protected final void implCloseChannel() throws IOException { implCloseSelectableChannel(); synchronized (keyLock) { int count = (keys == null) ? 0 : keys.length; for (int i = 0; i < count; i++) { SelectionKey k = keys[i]; if (k != null) k.cancel(); } } }
void cancel(SelectionKey k) { synchronized (cancelledKeys) { cancelledKeys.add(k); } }
void processDeregisterQueue() throws IOException { Set cks = cancelledKeys(); synchronized (cks) { if (!cks.isEmpty()) { Iterator i = cks.iterator(); while (i.hasNext()) { SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); try { implDereg(ski); } catch (SocketException se) { IOException ioe = new IOException( "Error deregistering key"); ioe.initCause(se); throw ioe; } finally { i.remove(); } } } } }
在 linux 2.6 以后的平台上,JDK 提供的 Selector 就由 EPollSelectorImpl 来实现。EPollSelectorImpl 则使用系统底层的 epoll.
EPollSelectorImpl 的实现和 PollSelectorImpl 实现类似。其底层主要使用 epoll 机制来实现。
- epoll_create
- epoll_ctl
- epoll_wait
- nio Selector 阻塞 唤醒 原理