Selector
1. socket 的 I/O 编程模型
socket 在 c 中的使用 select / poll / epoll
select
1 | int select(int nfds, fd_set *readfds, fd_set *writefds, |
select() and pselect() allow a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become “ready” for some class of I/O operation (e.g., input possible). A file descriptor is considered ready if it is possible to perform the corresponding I/O operation (e.g., read(2)) without blocking.
Three independent sets of file descriptors are watched.
On exit, the sets are modified in place to indicate which file descriptors actually changed status. Each of the three file descriptor sets may be specified as NULL if no file descriptors are to be watched for the corresponding class of events.
On success, select() and pselect() return the number of file descriptors contained in the three returned descriptor sets (that is, the total number of bits that are set in readfds, writefds, exceptfds) which may be zero if the timeout expires before anything interesting happens. On error, -1 is returned, and errno is set appropriately; the sets and timeout become undefined, so do not rely on their contents after an error.
FD_ZERO() clears a set. FD_SET() and FD_CLR() respectively add and remove a given file descriptor from a set. FD_ISSET() tests to see if a file descriptor is part of the set;
poll
1 | int poll(struct pollfd *fds, nfds_t nfds, int timeout); |
poll() performs a similar task to select(2): it waits for one of a set of file descriptors to become ready to perform I/O.
fds: The set of file descriptors to be monitored is specified in the fds argument, which is an array of structures of pollfd
nfds: specify the number of items in the fds array
timeout: specifies the minimum number of milliseconds that poll() will block.
poll 调用后执行的效果
On success, a positive number is returned; this is the number of structures which have nonzero revents fields (in other words, those descriptors with events or errors reported). A value of 0 indicates that the call timed out and no file descriptors were ready. On error, -1 is returned, and errno is set appropriately.
其中 pollfd 结构体中的 revents
, The field revents is an output parameter, filled by the kernel with the events that actually occurred.
epoll
The epoll API performs a similar task to poll(2): monitoring multiple file descriptors to see if I/O is possible on any of them.
Three system calls are provided to set up and control an epoll set: epoll_create(2), epoll_ctl(2), epoll_wait(2).
An epoll set is connected to a file descriptor created by epoll_create(2). Interest for certain file descriptors is then registered via epoll_ctl(2). Finally, the actual wait is started by epoll_wait(2).
epoll_create
1 | int epoll_create(int size); |
epoll_create() creates an epoll(7) instance. Since Linux 2.6.8, the size argument is ignored, but must be greater than zero;
epoll_create() returns a file descriptor referring to the new epoll instance. This file descriptor is used for all the subsequent calls to the epoll interface. When no longer required, the file descriptor returned by epoll_create() should be closed by using close(2). When all file descriptors referring to an epoll instance have been closed, the kernel destroys the instance and releases the associated resources for reuse.
epoll_ctl
This system call performs control operations on the epoll(7) instance referred to by the file descriptor epfd. It requests that the operation op be performed for the target file descriptor fd.
1 | int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); |
epfd
the epoll(7) instance referred to by the file descriptor epfd.
op
the operation op be performed for the target file descriptor fd
EPOLL_CTL_ADD
Register the target file descriptor fd on the epoll instance referred to by the file descriptor epfd and associate the event event with the internal file linked to fd.
EPOLL_CTL_MOD
Change the event event associated with the target file descriptor fd.
EPOLL_CTL_DEL
Remove (deregister) the target file descriptor fd from the epoll instance referred to by epfd. The event is ignored and can be NUL.
fd
执行上述操作的目标 file descriptor
event
The event argument describes the object linked to the file descriptor fd.
这个参数是一个 epoll_event 结构体的对象
其中的 events 表示关注的事件类型,可以是以下值:
EPOLLIN
The associated file is available for read(2) operations.
EPOLLOUT
The associated file is available for write(2) operations.
epoll_wait
函数定义
1 | int epoll_wait(int epfd, struct epoll_event *events, |
函数行为
The epoll_wait() system call waits for events on the epoll(7) instance referred to by the file descriptor epfd.
函数参数
epfd
the epoll(7) instance referred to by the file descriptor epfd
events
The memory area pointed to by events will contain the events that will be available for the caller.
maxevents
Up to maxevents are returned by epoll_wait(). The maxevents argument must be greater than zero.
timeout
The timeout argument specifies the minimum number of milliseconds that epoll_wait() will block.
函数返回值
When successful, epoll_wait() returns the number of file descriptors ready for the requested I/O, or zero if no file descriptor became ready during the requested timeout milliseconds. When an error occurs, epoll_wait() returns -1 and errno is set appropriately.
2. jdk 中实现的 select
2.1 SelectableChannel
A channel that can be multiplexed via a Selector.
SelectableChannel 是一种可以被 Selector 使用多路复用的 Channel
In order to be used with a selector, an instance of this class must first be registered via the register method. This method returns a new SelectionKey object that represents the channel’s registration with the selector.
A channel must be placed into non-blocking mode before being registered with a selector, and may not be returned to blocking mode until it has been deregistered.
调用 register
返回一个 SelectionKey
这个接口的功能:
注册 channel 到一个 selector
调用
register
将当前 channel 注册到 selector一个 channel 可以注册到多个 selector
一个 channel 可以注册到多个完全不同的 selector,同时对于同一个 selector 多次调用
register
只是改变当前已经注册的 SelectionKey 的相关属性。channel 取消注册
channel 关闭时,默认将从 selector 取消注册。(调用其 cancel 方法)
由于 SelectableChannel 可以注册到多个 selector 上,所有这个接口的实现中持有一个SelectionKey 的数组,用来维护,所有已经注册到当前Channel上的 selector。
1 | // AbstractSelectableChannel |
由于不同的 SelectableChannel 可能关注的操作不同,所以这个类的 validOps
方法由子类来实现。只有具体的子类才知道,自己关注什么操作。
同时 AbstractSelectableChannel
重写了 implCloseChannel
:
1 | protected final void implCloseChannel() throws IOException { |
keyFor
方法用来,查询 channel 在 selector 上的注册关系 SelectionKey
2.2 SelectionKey
A token representing the registration of a SelectableChannel with a Selector.
代表 Channel 和 Selector 的注册关系
所以 SelectionKey 持有具有注册关系的 <Channel, Selector> 对。
其 channel
方法返回注册关系中的 SelectableChannel 对象
其 selector
方法返回注册关系中的 Selector 对象
2.2.1 断开注册关系
SelectionKey 的 cancel
方法可以断开这种注册关系
2.2.2 注册关系的属性
attachment
表示当前 key 对象,附加的一个参数。
可以使用
attach
方法将一个对象附加到这个 key 上,同时这个方法将返回之前已经附加过的对象。使用
attachment
返回当前附加的对象。这个属性存在的意义:
register
方法的调用(SelectionKey对象的创建)和 SelectionKey的使用通常不在一个线程中,所以可以通过这个对象,实现,跨线程传递参数。interestOps
表示这个 key 所关注的操作。
使用
interestOps
方法可以进行设置,注意这个设置是完全覆盖,而不是和已经存在的值进行异或。
2.2.3 用来标识在 Selector 上注册的某种操作,已经准备就绪
1 | class SelectionKeyImpl extends AbstractSelectionKey { |
2.3 Selector
A multiplexor of SelectableChannel objects.
多路复用器 for SelectableChannel 对象
select 方法的实现
1 | abstract class SelectorImpl extends AbstractSelector { |
2.3.1 linux 平台下的实现
SunOS
DevPollSelectorImpl
Linux & Linux kernels >= 2.6
EPollSelectorImpl
Linux
PollSelectorImpl
2.3.2 windows 平台下的实现
WindowsSelectorImpl
register 的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59// SelectorImpl
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
// 创建 SelectionKey
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
// 添加附件
k.attach(attachment);
synchronized (publicKeys) {
// 调用子类的实现,进行注册。
implRegister(k);
}
// 设置 interestOps
k.interestOps(ops);
return k;
}
private final int INIT_CAP = 8;
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// WindowsSelectorImpl
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
growIfNeeded();
// 将 SelectionKey 存储到 Selector 相关的结构中
// 以数组的形式存储 SelectionKey
channelArray[totalChannels] = ski;
// 设置 SelectionKey 在 channelArray 数组中的索引
ski.setIndex(totalChannels);
// 可以通过 fdMap.get(fd); 查找到 fd 对应的 SelectionKey 对象,
// 用在当 select 操作完成之后,需要对 fd 所对应的 SelectionKey 进行设置时。
fdMap.put(ski);
// The set of keys registered with this Selector
// keys 存储所有在 Selector 上注册的 SelectionKey
// Selector.keys 方法将返回这个集合
keys.add(ski);
// totalChannels 表示 ski 在 channelArray 中的索引位置
// 将注册的Socket的 fd 存储到 pollWrapper 中的 pollArray 中
// pollArray 的结构如下:
// +---+---+---+---+---+---+---+---+---+---+---+---+
// | | fd e | fd e | fd e | fd e | fd e |
// +---+---+---+---+---+---+---+---+---+---+---+---+
// 其中 fd 占4个字节,表示 socket 的文件描述符
// e 占4个字节,表示 fd 此时的关注的事件
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
}doSelect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
// poll 方法最终调用 系统底层的 select 方法
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
// 更新 selectedKeys, 则 Selector.selectedKeys 返回这些值
int updated = updateSelectedKeys();
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket();
return updated;
}
updateSelectedKeys 方法的使用主要有两个:
更新 selectedKeys
SelectorImpl.selectedKeys 中存储着已经可以进行操作的 SelectionKey,这里的操作是指 read, write, connection 等操作。
也就是说 selectedKeys 集合里面的 SelectionKey 所指向的 Socket 可以进行上述操作,而不会被 block.
设置 SelectionKey 中的兴趣集
上面更新的 selectedKeys 集合,可以通过 Selecor.selectedKeys 方法获得。但是对于 selectedKeys 中的每一个 SelectionKey, 还需要知道它此时是可以进行什么操作,所以需要对其进行设置。
也就是设置 SelectionKeyImpl.readyOps 字段。
这样 SelectionKey 的 isWritable, isReadable, isConnectable,isAcceptable 就可以依据 readyOps 进行判断了。
核心实现在 SocketChannelImpl.translateReadyOps 方法中。
- subSelector.poll() 的实现
1 | private int processSelectedKeys(long updateCount) { |
1 | typedef struct { |