一起玩玩libuv(五)—— kqueue以及uv__io_poll

本文讲一下kqueue的基本使用以及在libuv中uv__io_poll函数的kqueue部分的实现

1.kqueue简介

int kqueue(void)

生成一个内核事件队列,并返回队列的描述符。

struct kevent

struct kevent { 
    uintptr_t ident;        /* 事件 ID */ 
    short     filter;       /* 事件过滤器 */ 
    u_short   flags;        /* 行为标识 */ 
    u_int     fflags;       /* 过滤器标识值 */ 
    intptr_t  data;         /* 过滤器数据 */ 
    void      *udata;       /* 应用透传数据 */ 
 }; 
ident

一般设置为文件描述符

filter

内核检查ident上注册的filter的状态,状态发生变化就通知应用程序,Tcp相关的filter如下:

  • EVFILT_READ:Socket描述符处于监听状态,当有链接进来的时候,data表示就绪队列的链接的数量;对于其他的描述符,当数据可接受的时候返回,并且data是数据可读的数量
  • EVFILT_WRITE:当写入缓冲区可写的时候返回,data表示写入缓冲区的大小。
flags
  • EV_ADD:在kqueue中增加一个event
  • EV_DELETE:在kqueue中删除一个event
fflags

filter相关的flag

data

filter相关的数据

udata

用户自定义的数据

EV_SET 宏定义

EV_SET(&kev, ident, filter, flags, fflags, data, udata);

EV_SET用于初始化struct kevent

kevent方法

int
kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout);

kevent函数用于注册和删除事件,并返回就绪事件或者错误事件。

  • kq:kqueue方法返回的描述符
  • changelist:注册或者删除的事件列表
  • nchanges:changelist数量
  • eventlist:满足条件的事件通知数组
  • nevents:eventlist数量
  • timeout:超时时间,timeout是nullptr,kevent无限期等待,否则超时返回

2.libuv怎样使用kqueue

loop初始化

# in loop.c
int uv_loop_init(uv_loop_t* loop) {
    ...
    err = uv__platform_loop_init(loop);
    ...
}

# in darwin.c 
int uv__platform_loop_init(uv_loop_t* loop) {
      loop->cf_state = NULL;

      if (uv__kqueue_init(loop))
        return -errno;

      return 0;
}

# in kqueue.c
int uv__kqueue_init(uv_loop_t* loop) {
      loop->backend_fd = kqueue();
      ...
  }

本文在MacOSX上进行调试程序,所以平台相关的代码是基于darwin的。看到在loop进行初始化的时候,会对backend_fd参数赋值为kqueue()。

struct kevent 初始化

# in kqueue.c

while (!QUEUE_EMPTY(&loop->watcher_queue)) {
    // 【1】从watcher_queue中取出uv__io_t
    q = QUEUE_HEAD(&loop->watcher_queue);
    ...
    w = QUEUE_DATA(q, uv__io_t, watcher_queue);
    ...
    if ((w->events & UV__POLLIN) == 0 && (w->pevents & UV__POLLIN) != 0) {
      filter = EVFILT_READ;
      fflags = 0;
      op = EV_ADD;
      // 【2】初始化kevent,filter=EVFILT_READ,flags=EV_ADD
      EV_SET(events + nevents, w->fd, filter, op, fflags, 0, 0);

      if (++nevents == ARRAY_SIZE(events)) {
        // 【3】通过kevent函数注册时间
        if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
          abort();
        nevents = 0;
      }
    }

    if ((w->events & UV__POLLOUT) == 0 && (w->pevents & UV__POLLOUT) != 0) {
      // 【4】初始化kevent,filter=EVFILT_WRITE
      EV_SET(events + nevents, w->fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);

      if (++nevents == ARRAY_SIZE(events)) {
        if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
          abort();
        nevents = 0;
      }
    }

    w->events = w->pevents;
}

libuv中的对struct kevent还是蛮清晰的,对于POLLIN类型,加入EVFILT_READ类型的kevent(见【2】),对于POLLOUT类型,加入EVFILT_WRITE类型的kevent(见【4】)。

数据监听

# in kqueue.c

nfds = kevent(loop->backend_fd,
              events,
              nevents,
              events,
              ARRAY_SIZE(events),
              timeout == -1 ? NULL : &spec);
...
for (i = 0; i < nfds; i++) {
  ...
  w = loop->watchers[fd];
  ...
  //【1】 处理读数据
  if (ev->filter == EVFILT_READ) {
    if (w->pevents & UV__POLLIN) {
      revents |= UV__POLLIN;
      w->rcount = ev->data;
    } else {
      ...
    }
  }
  //【2】 处理写状态
  if (ev->filter == EVFILT_WRITE) {
    if (w->pevents & UV__POLLOUT) {
      revents |= UV__POLLOUT;
      w->wcount = ev->data;
    } else {
      ...
    }
  }
  //【3】处理错误
  if (ev->flags & EV_ERROR)
    revents |= UV__POLLERR;

  if (revents == 0)
    continue;

  //【4】回调函数
  w->cb(loop, w, revents);
  ...
}

这里处理满足条件的事件,分别包括读数据、写数据、错误状态。最后回调callback。

3.参考资料

  1. 使用 kqueue 在 FreeBSD 上开发高性能应用服务器 - http://www.ibm.com/developerworks/cn/aix/library/1105_huangrg_kqueue/index.html