Python gevent (1) 之 greenlet模块

1.起因

异步模型的一个问题是导致代码分裂,要使用好多的callback才能把逻辑串联。协程恰恰可以解决这个问题,但使用Python的人都知道Python2.x是没有原生的协程支持的,yield关键字的支持也称不上完全的协程。好在Python有gevent呀,它可以解决Python协程的问题。
gevent模块依赖一个叫做greenlet的模块,greenlet是Stackless python的分拆(spin-off),或者说是其的一个子集,greenlet每一个”tasklet”需要自己主动的去进行切换(switch)。本文主要对greenlet进行介绍,并对其内部实现进行解剖。

阅读本文之前尽量熟悉greenlet的基本用法,本文不会对greenlet的用法做详细介绍。

2.greenlet的使用

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34

def test2():
    print 56
    gr1.switch()
    print 78

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

上面的代码将会输出12、56、34,唯独78将不会输出。

3.内部实现原理

3.1 顶层的greenlet

我们在import greenlet的时候会自动生成一个顶层的PyGreenlet实例,具体的实现如下:

static PyGreenlet* green_create_main(void)
{
    PyGreenlet* gmain;
    PyObject* dict = PyThreadState_GetDict();
    if (dict == NULL) {
        if (!PyErr_Occurred())
            PyErr_NoMemory();
        return NULL;
    }

    /* create the main greenlet for this thread */
    gmain = (PyGreenlet*) PyType_GenericAlloc(&PyGreenlet_Type, 0);
    if (gmain == NULL)
        return NULL;
    gmain->stack_start = (char*) 1;  【1】
    gmain->stack_stop = (char*) -1;  【2】
    gmain->run_info = dict;    【3】
    Py_INCREF(dict);
    return gmain;
}

我们看到stack_stop的地址被赋值为-1,其实就是一个非常大得地址,我们可以认为其他的greenlet的stack_stop都无法大于这个地址了。gmain->stack_start被赋值为1,这个可以不用去管,在greenlet切换的时候会将其赋值为esp的值。run_info是PyThreadState的Dict,用于存储一些信息。此时我们的栈空间很简单,如下:

ts_current
     |_______________|  ______ gmain->stack_stop(OxFFFFFFFF)
   |               |
   | main greenlet |
   |   in stack    |
   |               | 

目前我们只有一个GreentLet,并赋值给了ts_current。

3.2 创建第一个greenlet,并进行switch

现在我们将创建第一个greenlet,代码如下:

def test1():
    print 12
    gr2.switch()
    print 34

gr1 = greenlet(test1)
gr1.switch()

gr1的创建有两个参数,一个是gr1将要运行的实体,这里是test1;另一个是gr1的父greenlet,如果不填则为当前的greenlet,这里就是我们在import greenlet的时候创建的那个main greenlet。之后,开始进行两个greenlet之间的切换(switch)。

static PyObject* green_switch(
PyGreenlet* self,
PyObject* args,
PyObject* kwargs)
{
    Py_INCREF(args);
    Py_XINCREF(kwargs);
    return single_result(g_switch(self, args, kwargs));
}

入口就是上面的函数,并没有做什么处理,直接进入了g_switch方法。

static PyObject *
g_switch(PyGreenlet* target, PyObject* args, PyObject* kwargs)
{
    ...
    run_info = green_statedict(target);
    ...
    /* find the real target by ignoring dead greenlets,
           and if necessary starting a greenlet. */
    while (target) {
        if (PyGreenlet_ACTIVE(target)) {   // 此greenlet已经被激活了,就是说它在运行了
            ts_target = target;
            err = g_switchstack();
            break;
        }
        if (!PyGreenlet_STARTED(target)) {    // 此greenlet是一个新的greenlet,还没有开始运行
            void* dummymarker;
            ts_target = target;
            err = g_initialstub(&dummymarker);
            if (err == 1) {
                continue; /* retry the switch */
            }
            break;
        }
        target = target->parent;
    }
    ...
}

greenlet的切换分两个分支:

  1. target greenlet已经被激活,在运行中
  2. target greenlet未被激活

先从第二种情况说起,从上面的代码可以看出代码进入g_initialstub函数,参数是一个本地变量的地址&dummymarker,接下来我们会看到dummymarker的地址将会作为target greenlet的stack_stop使用。

static int GREENLET_NOINLINE(g_initialstub)(void* mark) 
{
    ...
    // 获取target greenlet的run函数,上下文切换后将会被调用
    run = PyObject_GetAttrString((PyObject*) self, "run");
    ...
    /*  这里有几点比较重要
        1. stack_stop被赋值为mark
        2. target greenlet的stack_prev被赋值为当前的greenlet
        3. target greenlet保存的PyThreadState信息置空,只留recursion_depth
     */
    self->stack_start = NULL;            // stack_start = Null
    self->stack_stop = (char*) mark;    // stack_stop = mark
    if (ts_current->stack_start == NULL) {
        /* ts_current is dying */
        self->stack_prev = ts_current->stack_prev;
    }
    else {
        self->stack_prev = ts_current;        // 指向上一个greenlet
    }
    self->top_frame = NULL;
    self->exc_type = NULL;
    self->exc_value = NULL;
    self->exc_traceback = NULL;
    self->recursion_depth = PyThreadState_GET()->recursion_depth; 

    /* restore arguments in case they are clobbered */
    ts_target = self;
    ts_passaround_args = args;
    ts_passaround_kwargs = kwargs;

    /* perform the initial switch */
    err = g_switchstack();   // 开始进行切换
    ...
    /* returns twice!
           The 1st time with err=1: we are in the new greenlet
           The 2nd time with err=0: back in the caller's greenlet
           将会被返回两次
           err = 1 说明在新的greenlet的上下文里面
           err = 0 说明我们回到了刚才调用switch的caller的上下文里面了
    */
    if (err == 1) {
        ...
        self->stack_start = (char*) 1;  // 这里的stack_start的赋值只是临时性的
        ...
        if (args == NULL) {
            /* pending exception */
            result = NULL;
        } else {
            /* call g.run(*args, **kwargs) */
            /*  运行我们在构造greenlet时注册的func
             */
            result = PyEval_CallObjectWithKeywords(
                run, args, kwargs);
            Py_DECREF(args);
            Py_XDECREF(kwargs);
        }
        Py_DECREF(run);
        result = g_handle_exit(result);
        /* jump back to parent */
        self->stack_start = NULL;  /* dead */
        // 此greenlet已经结束了,现在要切换到父greenlet运行
        for (parent = self->parent; parent != NULL; parent = parent->parent) {
            result = g_switch(parent, result, NULL);
            /* Return here means switch to parent failed,
              * in which case we throw *current* exception
              * to the next parent in chain.
              */
            assert(result == NULL);
        }
        // 这里应该不会被运行到
    }

接下来我们看一下g_switchstack都做了什么。g_switchstack主要做三件事:

  1. 首先保存当前的PyThreadState的内容到当前的greenlet
  2. 进行上下文切换
  3. 切换成功后,ts_current=target,ts_origin=last_greenlet,target相关的PyThreadState信息赋值给PyThreadState

static int g_switchstack(void)
{

err = slp_switch(); // slp_switch 是平台相关的函数

}

到目前为止,C的栈空间的格局如下:

            |           |
            |           |
            |___________|  <---- ts_current->stack_stop
            |           |
            |           |
dummymarker    |___________|  <---- ts_target->stack_stop
            |           |
            | will be   |
            | save ..   |
        ESP |___________|  <---- ts_current->stack_start
            |           |

slp_switch函数将会进行上下文切换,首先就是将上图显示的will be save的栈内容保存。具体的实现在slp_save_state方法内。

/*  
    此函数的内容不列出了,主要做了下面一件事情

    检查当前greenlet(ts_current)的每一个前置greenlet
    如果它的stack_stop小于target的stack_stop(也就是
    说target得栈会覆盖老的greenlet),那么我们就将这些
    老的greenlet的栈进行保存(g_save)
 */
static int GREENLET_NOINLINE(slp_save_state)(char* stackref)
{
    ... ...
} 

之后,如果当前的ts_target没有被激活,我们的上下文切换会立即返回1,具体的逻辑在SLP_SAVE_STATE的宏定义内。

#define SLP_SAVE_STATE(stackref, stsizediff)            \
    stackref += STACK_MAGIC;                        \
    if (slp_save_state((char*)stackref)) return -1; \
    if (!PyGreenlet_ACTIVE(ts_target)) return 1;    \
    stsizediff = ts_target->stack_start - (char*)stackref

如果ts_target已经被激活了,会计算stsizediff,即target的栈顶指针与当前esp的差值,并在平台相关代码内将esp偏移到对于的target->stack_start的位置。之后方法slp_restore_state 将会被调用,用于恢复ts_target的栈信息。

/*  此函数主要做下面两件操作

    1. 把target保存的栈数据拷贝到栈里面
    2. 对current向上回溯,找到第一个stack_stop > target->stack_stop的greenlet,并对target->stack_prev赋值
 */
static void GREENLET_NOINLINE(slp_restore_state)(void)
{
    ...
}

OK,其实greenlet的上下文切换都在这里了,唯一没有讲得就是slp_switch内部的汇编代码。每一个平台都不同,但主要是对esp的操作。

现在回到最初的函数g_switch,上面将到有两个分支,另一个分支是ts_target已经被激活了,逻辑上会直接调用g_switchstack函数,函数内的逻辑我们已经都讲到了。

reference:

  1. 说说这篇「我为什么从python转向go」- http://www.jianshu.com/p/xiQzpL
  2. Mac OS 10.9安装gevent - https://www.phodal.com/blog/mac-os-x-10-9-install-gevent-greenlet/
  3. GCC-Inline-Assembly-HOWTO - http://www.ibiblio.org/gferg/ldp/GCC-Inline-Assembly-HOWTO.html
  4. greenlet 实现原理 - https://code.google.com/p/libhjw/wiki/notes_on_greenlet
  5. 【Linux学习笔记】Linux C中内联汇编的语法格式及使用方法(Inline Assembly in Linux C)- http://blog.csdn.net/slvher/article/details/8864996