PollSelectorImpl
linux 2.6以下 及 unix 系统的 Selector 是由 PollSelectorImpl 类来实现的。
创建一个 PollSelectorImpl 的时候,创建了什么
创建一个 PollSelectorImpl 的时候,就是创建了一个 PollArrayWrapper 对象。用来存储 pollfd.
register
PollArray 的结构
1 2 3 4 5 6 7 8 9 10 11 12 13
| struct pollfd { int fd; short events; short revents; };
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ |fd e| | | | | | | | | | | +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|
PollSelectorImpl 类持有一个 PollArray 数组的实例。其默认大小是10. 这个数组的第一个元素是管道的句柄。
1 2 3 4 5 6 7 8 9 10 11 12
|
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new PollArrayWrapper(INIT_CAP);
pollWrapper.initInterrupt(fd0, fd1);
|
register 一个 SocketChannel 的时候,发生了什么
其实就是将 SocketChannel 保存到上面的结构中
select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); pollWrapper.poll(totalChannels, 0, timeout); } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.getReventOps(0) != 0) { pollWrapper.putReventOps(0, 0); synchronized (interruptLock) { IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; }
protected int updateSelectedKeys() { int numKeysUpdated = 0; for (int i=channelOffset; i<totalChannels; i++) { int rOps = pollWrapper.getReventOps(i); if (rOps != 0) { SelectionKeyImpl sk = channelArray[i]; pollWrapper.putReventOps(i, 0); if (selectedKeys.contains(sk)) { if (sk.channel.translateAndSetReadyOps(rOps, sk)) { numKeysUpdated++; } } else { sk.channel.translateAndSetReadyOps(rOps, sk); if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { selectedKeys.add(sk); numKeysUpdated++; } } } } return numKeysUpdated; }
|
调用 Selector 的 select 方法的时候,发生了什么
Selector.select 方法最终调用系统的 poll 方法,进行等待状态。
当其返回时,主要做下面3件事:
Copy the information in the pollfd structs
into the opss of the corresponding Channels.
设置好 SelectionKey 的状态变量,然后这几个查询方法就可以使用了:isAcceptable,isConnectable,isReadable,isWritable,readyOps
通过这些方法,就可以对 SelectionKey 关联的 channel 进行读写操作了。
Add the ready keys to the ready queue.
这个 ready set 由 Selector.selectedKeys 方法,可以获得
统计channel的状态发生变化的个数
这个个数由 select 方法返回。
Selctor 的唤醒 wakeup
wakeup
将导致 Selector 方法的 select 方法直接返回。
因为 select 操作会在其所关注的 fd 上发生感兴趣的事件的时候返回。所以可以在调用 select 的时候,向其注册一个pipe, 需要唤醒这个 selector 操作的时候,就可以向这个 pipe 中写入数据。此时pipe就会被唤醒。
所以在 pollWrapper 的第一个元素默认是 pipe.
1 2 3 4 5 6 7 8 9 10 11 12 13
| public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { pollWrapper.interrupt(); interruptTriggered = true; } } return this; }
|
Selctor 的关闭 close
下面分析 Selector 的关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| private AtomicBoolean selectorOpen = new AtomicBoolean(true); public final void close() throws IOException { boolean open = selectorOpen.getAndSet(false); if (!open) return; implCloseSelector(); }
protected void implClose() throws IOException { synchronized (closeLock) { if (closed) return; closed = true; for(int i=channelOffset; i<totalChannels; i++) { SelectionKeyImpl ski = channelArray[i]; assert(ski.getIndex() != -1); ski.setIndex(-1); deregister(ski); SelectableChannel selch = channelArray[i].channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); } implCloseInterrupt(); pollWrapper.free(); pollWrapper = null; selectedKeys = null; channelArray = null; totalChannels = 0; } }
protected void implCloseInterrupt() throws IOException { synchronized (interruptLock) { interruptTriggered = true; } FileDispatcherImpl.closeIntFD(fd0); FileDispatcherImpl.closeIntFD(fd1); fd0 = -1; fd1 = -1; pollWrapper.release(0); } `
|
注册在 Selector 上的 Channel 也可以主动关闭:
Channel 在关闭的时候,会在将其所注册的所有 Selector 上执行 cancel 操作。将 Selector 中与待关闭的 Channel 相关的数据进行清除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| protected final void implCloseChannel() throws IOException { implCloseSelectableChannel(); synchronized (keyLock) { int count = (keys == null) ? 0 : keys.length; for (int i = 0; i < count; i++) { SelectionKey k = keys[i]; if (k != null) k.cancel(); } } }
void cancel(SelectionKey k) { synchronized (cancelledKeys) { cancelledKeys.add(k); } }
void processDeregisterQueue() throws IOException { Set cks = cancelledKeys(); synchronized (cks) { if (!cks.isEmpty()) { Iterator i = cks.iterator(); while (i.hasNext()) { SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); try { implDereg(ski); } catch (SocketException se) { IOException ioe = new IOException( "Error deregistering key"); ioe.initCause(se); throw ioe; } finally { i.remove(); } } } } }
|
EPollSelectorImpl
在 linux 2.6 以后的平台上,JDK 提供的 Selector 就由 EPollSelectorImpl 来实现。EPollSelectorImpl 则使用系统底层的 epoll.
EPollSelectorImpl 的实现和 PollSelectorImpl 实现类似。其底层主要使用 epoll 机制来实现。
- epoll_create
- epoll_ctl
- epoll_wait
参考
- nio Selector 阻塞 唤醒 原理