本文讲一下kqueue的基本使用以及在libuv中uv__io_poll函数的kqueue部分的实现
1.kqueue简介
int kqueue(void)
生成一个内核事件队列,并返回队列的描述符。
struct kevent
struct kevent {
uintptr_t ident; /* 事件 ID */
short filter; /* 事件过滤器 */
u_short flags; /* 行为标识 */
u_int fflags; /* 过滤器标识值 */
intptr_t data; /* 过滤器数据 */
void *udata; /* 应用透传数据 */
};
ident
一般设置为文件描述符
filter
内核检查ident上注册的filter的状态,状态发生变化就通知应用程序,Tcp相关的filter如下:
- EVFILT_READ:Socket描述符处于监听状态,当有链接进来的时候,data表示就绪队列的链接的数量;对于其他的描述符,当数据可接受的时候返回,并且data是数据可读的数量
- EVFILT_WRITE:当写入缓冲区可写的时候返回,data表示写入缓冲区的大小。
flags
- EV_ADD:在kqueue中增加一个event
- EV_DELETE:在kqueue中删除一个event
fflags
filter相关的flag
data
filter相关的数据
udata
用户自定义的数据
EV_SET 宏定义
EV_SET(&kev, ident, filter, flags, fflags, data, udata);
EV_SET用于初始化struct kevent
kevent方法
int
kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout);
kevent函数用于注册和删除事件,并返回就绪事件或者错误事件。
- kq:kqueue方法返回的描述符
- changelist:注册或者删除的事件列表
- nchanges:changelist数量
- eventlist:满足条件的事件通知数组
- nevents:eventlist数量
- timeout:超时时间,timeout是nullptr,kevent无限期等待,否则超时返回
2.libuv怎样使用kqueue
loop初始化
# in loop.c
int uv_loop_init(uv_loop_t* loop) {
...
err = uv__platform_loop_init(loop);
...
}
# in darwin.c
int uv__platform_loop_init(uv_loop_t* loop) {
loop->cf_state = NULL;
if (uv__kqueue_init(loop))
return -errno;
return 0;
}
# in kqueue.c
int uv__kqueue_init(uv_loop_t* loop) {
loop->backend_fd = kqueue();
...
}
本文在MacOSX上进行调试程序,所以平台相关的代码是基于darwin的。看到在loop进行初始化的时候,会对backend_fd参数赋值为kqueue()。
struct kevent 初始化
# in kqueue.c
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
// 【1】从watcher_queue中取出uv__io_t
q = QUEUE_HEAD(&loop->watcher_queue);
...
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
...
if ((w->events & UV__POLLIN) == 0 && (w->pevents & UV__POLLIN) != 0) {
filter = EVFILT_READ;
fflags = 0;
op = EV_ADD;
// 【2】初始化kevent,filter=EVFILT_READ,flags=EV_ADD
EV_SET(events + nevents, w->fd, filter, op, fflags, 0, 0);
if (++nevents == ARRAY_SIZE(events)) {
// 【3】通过kevent函数注册时间
if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
abort();
nevents = 0;
}
}
if ((w->events & UV__POLLOUT) == 0 && (w->pevents & UV__POLLOUT) != 0) {
// 【4】初始化kevent,filter=EVFILT_WRITE
EV_SET(events + nevents, w->fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
if (++nevents == ARRAY_SIZE(events)) {
if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
abort();
nevents = 0;
}
}
w->events = w->pevents;
}
libuv中的对struct kevent还是蛮清晰的,对于POLLIN类型,加入EVFILT_READ类型的kevent(见【2】),对于POLLOUT类型,加入EVFILT_WRITE类型的kevent(见【4】)。
数据监听
# in kqueue.c
nfds = kevent(loop->backend_fd,
events,
nevents,
events,
ARRAY_SIZE(events),
timeout == -1 ? NULL : &spec);
...
for (i = 0; i < nfds; i++) {
...
w = loop->watchers[fd];
...
//【1】 处理读数据
if (ev->filter == EVFILT_READ) {
if (w->pevents & UV__POLLIN) {
revents |= UV__POLLIN;
w->rcount = ev->data;
} else {
...
}
}
//【2】 处理写状态
if (ev->filter == EVFILT_WRITE) {
if (w->pevents & UV__POLLOUT) {
revents |= UV__POLLOUT;
w->wcount = ev->data;
} else {
...
}
}
//【3】处理错误
if (ev->flags & EV_ERROR)
revents |= UV__POLLERR;
if (revents == 0)
continue;
//【4】回调函数
w->cb(loop, w, revents);
...
}
这里处理满足条件的事件,分别包括读数据、写数据、错误状态。最后回调callback。
3.参考资料
- 使用 kqueue 在 FreeBSD 上开发高性能应用服务器 - http://www.ibm.com/developerworks/cn/aix/library/1105_huangrg_kqueue/index.html