本文只分析Python类实例的创建,不涉及类本身的构建(类本身的构建将会在接下来的文章中介绍)。

# example
class A(object):
    pass
obj_a = A()

上面这一段代码对应的opcode如下:

 0 LOAD_CONST               1 ('A')
 3 LOAD_GLOBAL              0 (object)
 6 BUILD_TUPLE              1
 9 LOAD_CONST               2 (<code object A at 0x104f367b0, file "<stdin>", line 2>)
12 MAKE_FUNCTION            0
15 CALL_FUNCTION            0
18 BUILD_CLASS
19 STORE_FAST               0 (A)

22 LOAD_FAST                0 (A)
25 CALL_FUNCTION            0
28 STORE_FAST               1 (obj_a)

说明几个关键的opcode,BUILD_CLASS用于创建一个名为A的Class,在LOAD_FAST的时候Python虚拟机加载A,然后执行CALL_FUNCTION

1.从CALL_FUNCTION机器码看起

下面是从CALL_FUNCTION开始的函数调用流程:

CALL_FUNCTION->call_function->do_call->PyObject_Call...

PyObject_Call从函数命名很容易理解,这个函数是对PyObject进行调用。

PyObject *
PyObject_Call(PyObject *func, PyObject *arg, PyObject *kw) {
    if ((call = func->ob_type->tp_call) != NULL) {
        ...
        result = (*call)(func, arg, kw);
        ...
    }
    ...

func在这里对应的就是类A,当然我们在创建obj_a的时候是没有参数的,因此arg和kw便没有任何东西。在if语句中获取func->ob_type->tp_call,func就是A,那么ob_type在这里是PyType_Type。

# in typeobject.c
PyTypeObject PyType_Type = {
    ....
    (ternaryfunc)type_call,                     /* tp_call */
    ....

就是上面这个实例,其对应的tp_call是type_call这个函数。

static PyObject *
type_call(PyTypeObject *type, PyObject *args, PyObject *kwds) {
    ....
    obj = type->tp_new(type, args, kwds);
    ....

type->tp_new中的type还是类A,tp_new则是其基类object的tpnew(原因很简单,我们在A的定义中并没有重载\_new__函数),object的真面目就是PyBaseObject_Type。

# in typeobject.c
PyTypeObject PyBaseObject_Type = { 
    ....
    object_new,                                 /* tp_new */
    ....

2.object_new都做了什么

static PyObject *
object_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
    ....
    return type->tp_alloc(type, 0);
}

object_new做的事情很简单,就是调用类A的tp_alloc,这里的tp_alloc的实现是PyType_GenericAlloc函数,我们在前文也提到过。

3.总结

到这里obj_a的创建流程基本走完,无非后面会调用tpnew(如果我们定义了\_init__就会调用我们的这个方法),但obj_a在内存中的形式已经确定,就是一个简单的PyObject。

typedef struct _object {
    PyObject_HEAD
} PyObject;

其ob_type对应的就是类A

为什么要写PyType_GenericAlloc函数,原因是本人想写一篇关于创建Class的源码解析的博文,发现内容比较多,因此就打算分开解析里面比较重要的内容。

PyType_GenericAlloc用来做什么,从函数命名可以看出GenericAlloc说明是用于一般情况的内存空间分配,PyType说明是针对PyTypeObject进行内存空间分配的。

1.关于GC

Python有自己的GC逻辑,这里不多讲,主要想说明Python会维护需要GC的各种PyObject,因此Python在PyObject的头部又加入了一个PyGC_Head。

# 内存分布

        |-------------|
        |  PyGC_Head  |
        |-------------|
        |  PyObject   |
        |             |
        |     ...     |

因此有一个函数_PyObject_GC_Malloc负责分配上述的内存空间。

PyObject *
_PyObject_GC_Malloc(size_t basicsize)
{
    PyObject *op;
    PyGC_Head *g;
    if (basicsize > PY_SSIZE_T_MAX - sizeof(PyGC_Head))
        return PyErr_NoMemory();
    g = (PyGC_Head *)PyObject_MALLOC(
        sizeof(PyGC_Head) + basicsize);
    ...
    op = FROM_GC(g);
    return op;
 }

2.PyType_GenericAlloc全景

PyObject *
PyType_GenericAlloc(PyTypeObject *type, Py_ssize_t nitems)
{
    PyObject *obj;
    const size_t size = _PyObject_VAR_SIZE(type, nitems+1);   [1]
    /* note that we need to add one, for the sentinel */

    if (PyType_IS_GC(type))
        obj = _PyObject_GC_Malloc(size);    [2]
    else
        obj = (PyObject *)PyObject_MALLOC(size);

    if (obj == NULL)
        return PyErr_NoMemory();

    memset(obj, '\0', size);

    if (type->tp_flags & Py_TPFLAGS_HEAPTYPE)
        Py_INCREF(type);

    if (type->tp_itemsize == 0)
        PyObject_INIT(obj, type);
    else
        (void) PyObject_INIT_VAR((PyVarObject *)obj, type, nitems);  [3]

    if (PyType_IS_GC(type))
        _PyObject_GC_TRACK(obj);
    return obj;
}

关注三点,[1]处,计算要创建的PyObject的大小(nitems为什么要加1需要再深入看一下)。

#define _PyObject_VAR_SIZE(typeobj, nitems)     \
(size_t)                                    \
( ( (typeobj)->tp_basicsize +               \
    (nitems)*(typeobj)->tp_itemsize +       \
    (SIZEOF_VOID_P - 1)                     \
  ) & ~(SIZEOF_VOID_P - 1)                  \
)

size的组成是tp_basicsize + nitems * tp_itemsize,后面加上SIZEOF_VOID_P - 1是为了内存对齐。

[2]处使用_PyObject_GC_Malloc进行真正的内存分配。

[3]处初始化PyObject,确切的说应该是PyVarObject,看一下具体初始化了哪些东西

  1. ob_size = nitems 初始化数量
  2. ob_type = type 初始化对应的PyObjectType
  3. ob_refcnt = 1 初始化引用计数为1
  4. 调用_Py_AddToAllObjects,将obj加入到refchain双向链表中

2015年的最后一天了,是该总结一下

宝贝女儿出生

这一年最重要的事情就是我的宝贝女儿出生了,在2015年12月29号,下午15点30分,宝贝女儿来到了这个世界,以后生活的中心也就开始围绕着她来转咯。

女儿的出生也意味着我的人生迈入了另一个阶段,我成为了一个父亲,预示着自己要更有责任感,要有耐心,做事要多为老婆和女儿考虑。

辛苦&纠结

这也是辛苦的一年,我们的游戏上线了,在上线的几个月工作也是非常的辛苦,但游戏和我们预期的也一样,最终结果也并不好,这里面也有很多因素,但现在不想去总结什么,但从个人角度来说,经历了一次从游戏开发初期到游戏上线的过程,也是一份难得的经历。

为何纠结,自己也很难说清楚。主要是觉得,在公司的这两年,自己的成长还是太慢了,究其原因,我个人主要觉得有如下几点:

  1. 不够专注,一开始想客户端服务端两手抓,起初感觉很好,但持续下去你会发现哪一点你都不够深入,接下来一年深入服务端研究,主要深入网络,Python两个方面。网络涉及的面比较广,先把业务方面的东西搞懂,然后根据业务需求了解网络底层,同时关注Go,Erlang这些分布式语言,他们的好处,他们涉及的领域;Python方面这一整年都在研究,但研究的还是不深入,主要是读代码的工具不行,只是在看书,印象不深刻,接下来借助Xcode进行Debug,再次深入解析Python
  2. 迷惘,游戏这个行业真的是自己喜欢的行业么,我希望去做一些更有意义的工作,而不是现在的螺丝钉。但如果让你带一个团队,你真的有能力去做这件事情么?话题又回到了对自己的准确认识上面了。要正确的评估自己目前的实力!其实目前自己历练还是太少,大局观都没有,比如怎样设计架构,怎样优化系统,都没有明确的概念。这是为什么?归根结底还是自己懒,游戏在开发阶段有一段时间的MangoDB系统调优,自己没有参与;到游戏上线时期,又有系统性能调优自己也没有参加,如果当时自己参与,或许可以从中学到更多。
  3. 关于团队,自己所在的团队就是一个正在成长的团队,你的leader也很年轻,从他身上学习的东西也很少,但并不是没有。在这种情况下,自己就要想法子从其他方面学习,会走很多弯路,甚至没有走过正确的路。

2016年要做什么

  1. 业务一定要更加熟练,其实无论是留下还是另寻出路,都要对当前的业务熟练,这样对自己和团队都是一种负责的表现
  2. 多多参与服务端的优化,多尝试考虑怎样让服务端的架构更合理,更健壮,更高效。这个过程也是开阔知识面的过程。
  3. 开阔思路,不要一味的去写代码,开阔眼界,多去学习其他方面的知识,不限于编程。
  4. 2016的新的技术栈:Redis,MangoDB,Erlang,Golang
  5. 心疼老婆、孝敬老妈,爱我的女儿

1.起因

异步模型的一个问题是导致代码分裂,要使用好多的callback才能把逻辑串联。协程恰恰可以解决这个问题,但使用Python的人都知道Python2.x是没有原生的协程支持的,yield关键字的支持也称不上完全的协程。好在Python有gevent呀,它可以解决Python协程的问题。
gevent模块依赖一个叫做greenlet的模块,greenlet是Stackless python的分拆(spin-off),或者说是其的一个子集,greenlet每一个”tasklet”需要自己主动的去进行切换(switch)。本文主要对greenlet进行介绍,并对其内部实现进行解剖。

阅读本文之前尽量熟悉greenlet的基本用法,本文不会对greenlet的用法做详细介绍。

2.greenlet的使用

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34

def test2():
    print 56
    gr1.switch()
    print 78

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

上面的代码将会输出12、56、34,唯独78将不会输出。

3.内部实现原理

3.1 顶层的greenlet

我们在import greenlet的时候会自动生成一个顶层的PyGreenlet实例,具体的实现如下:

static PyGreenlet* green_create_main(void)
{
    PyGreenlet* gmain;
    PyObject* dict = PyThreadState_GetDict();
    if (dict == NULL) {
        if (!PyErr_Occurred())
            PyErr_NoMemory();
        return NULL;
    }

    /* create the main greenlet for this thread */
    gmain = (PyGreenlet*) PyType_GenericAlloc(&PyGreenlet_Type, 0);
    if (gmain == NULL)
        return NULL;
    gmain->stack_start = (char*) 1;  【1】
    gmain->stack_stop = (char*) -1;  【2】
    gmain->run_info = dict;    【3】
    Py_INCREF(dict);
    return gmain;
}

我们看到stack_stop的地址被赋值为-1,其实就是一个非常大得地址,我们可以认为其他的greenlet的stack_stop都无法大于这个地址了。gmain->stack_start被赋值为1,这个可以不用去管,在greenlet切换的时候会将其赋值为esp的值。run_info是PyThreadState的Dict,用于存储一些信息。此时我们的栈空间很简单,如下:

ts_current
     |_______________|  ______ gmain->stack_stop(OxFFFFFFFF)
   |               |
   | main greenlet |
   |   in stack    |
   |               | 

目前我们只有一个GreentLet,并赋值给了ts_current。

3.2 创建第一个greenlet,并进行switch

现在我们将创建第一个greenlet,代码如下:

def test1():
    print 12
    gr2.switch()
    print 34

gr1 = greenlet(test1)
gr1.switch()

gr1的创建有两个参数,一个是gr1将要运行的实体,这里是test1;另一个是gr1的父greenlet,如果不填则为当前的greenlet,这里就是我们在import greenlet的时候创建的那个main greenlet。之后,开始进行两个greenlet之间的切换(switch)。

static PyObject* green_switch(
PyGreenlet* self,
PyObject* args,
PyObject* kwargs)
{
    Py_INCREF(args);
    Py_XINCREF(kwargs);
    return single_result(g_switch(self, args, kwargs));
}

入口就是上面的函数,并没有做什么处理,直接进入了g_switch方法。

static PyObject *
g_switch(PyGreenlet* target, PyObject* args, PyObject* kwargs)
{
    ...
    run_info = green_statedict(target);
    ...
    /* find the real target by ignoring dead greenlets,
           and if necessary starting a greenlet. */
    while (target) {
        if (PyGreenlet_ACTIVE(target)) {   // 此greenlet已经被激活了,就是说它在运行了
            ts_target = target;
            err = g_switchstack();
            break;
        }
        if (!PyGreenlet_STARTED(target)) {    // 此greenlet是一个新的greenlet,还没有开始运行
            void* dummymarker;
            ts_target = target;
            err = g_initialstub(&dummymarker);
            if (err == 1) {
                continue; /* retry the switch */
            }
            break;
        }
        target = target->parent;
    }
    ...
}

greenlet的切换分两个分支:

  1. target greenlet已经被激活,在运行中
  2. target greenlet未被激活

先从第二种情况说起,从上面的代码可以看出代码进入g_initialstub函数,参数是一个本地变量的地址&dummymarker,接下来我们会看到dummymarker的地址将会作为target greenlet的stack_stop使用。

static int GREENLET_NOINLINE(g_initialstub)(void* mark) 
{
    ...
    // 获取target greenlet的run函数,上下文切换后将会被调用
    run = PyObject_GetAttrString((PyObject*) self, "run");
    ...
    /*  这里有几点比较重要
        1. stack_stop被赋值为mark
        2. target greenlet的stack_prev被赋值为当前的greenlet
        3. target greenlet保存的PyThreadState信息置空,只留recursion_depth
     */
    self->stack_start = NULL;            // stack_start = Null
    self->stack_stop = (char*) mark;    // stack_stop = mark
    if (ts_current->stack_start == NULL) {
        /* ts_current is dying */
        self->stack_prev = ts_current->stack_prev;
    }
    else {
        self->stack_prev = ts_current;        // 指向上一个greenlet
    }
    self->top_frame = NULL;
    self->exc_type = NULL;
    self->exc_value = NULL;
    self->exc_traceback = NULL;
    self->recursion_depth = PyThreadState_GET()->recursion_depth; 

    /* restore arguments in case they are clobbered */
    ts_target = self;
    ts_passaround_args = args;
    ts_passaround_kwargs = kwargs;

    /* perform the initial switch */
    err = g_switchstack();   // 开始进行切换
    ...
    /* returns twice!
           The 1st time with err=1: we are in the new greenlet
           The 2nd time with err=0: back in the caller's greenlet
           将会被返回两次
           err = 1 说明在新的greenlet的上下文里面
           err = 0 说明我们回到了刚才调用switch的caller的上下文里面了
    */
    if (err == 1) {
        ...
        self->stack_start = (char*) 1;  // 这里的stack_start的赋值只是临时性的
        ...
        if (args == NULL) {
            /* pending exception */
            result = NULL;
        } else {
            /* call g.run(*args, **kwargs) */
            /*  运行我们在构造greenlet时注册的func
             */
            result = PyEval_CallObjectWithKeywords(
                run, args, kwargs);
            Py_DECREF(args);
            Py_XDECREF(kwargs);
        }
        Py_DECREF(run);
        result = g_handle_exit(result);
        /* jump back to parent */
        self->stack_start = NULL;  /* dead */
        // 此greenlet已经结束了,现在要切换到父greenlet运行
        for (parent = self->parent; parent != NULL; parent = parent->parent) {
            result = g_switch(parent, result, NULL);
            /* Return here means switch to parent failed,
              * in which case we throw *current* exception
              * to the next parent in chain.
              */
            assert(result == NULL);
        }
        // 这里应该不会被运行到
    }

接下来我们看一下g_switchstack都做了什么。g_switchstack主要做三件事:

  1. 首先保存当前的PyThreadState的内容到当前的greenlet
  2. 进行上下文切换
  3. 切换成功后,ts_current=target,ts_origin=last_greenlet,target相关的PyThreadState信息赋值给PyThreadState

static int g_switchstack(void)
{

err = slp_switch(); // slp_switch 是平台相关的函数

}

到目前为止,C的栈空间的格局如下:

            |           |
            |           |
            |___________|  <---- ts_current->stack_stop
            |           |
            |           |
dummymarker    |___________|  <---- ts_target->stack_stop
            |           |
            | will be   |
            | save ..   |
        ESP |___________|  <---- ts_current->stack_start
            |           |

slp_switch函数将会进行上下文切换,首先就是将上图显示的will be save的栈内容保存。具体的实现在slp_save_state方法内。

/*  
    此函数的内容不列出了,主要做了下面一件事情

    检查当前greenlet(ts_current)的每一个前置greenlet
    如果它的stack_stop小于target的stack_stop(也就是
    说target得栈会覆盖老的greenlet),那么我们就将这些
    老的greenlet的栈进行保存(g_save)
 */
static int GREENLET_NOINLINE(slp_save_state)(char* stackref)
{
    ... ...
} 

之后,如果当前的ts_target没有被激活,我们的上下文切换会立即返回1,具体的逻辑在SLP_SAVE_STATE的宏定义内。

#define SLP_SAVE_STATE(stackref, stsizediff)            \
    stackref += STACK_MAGIC;                        \
    if (slp_save_state((char*)stackref)) return -1; \
    if (!PyGreenlet_ACTIVE(ts_target)) return 1;    \
    stsizediff = ts_target->stack_start - (char*)stackref

如果ts_target已经被激活了,会计算stsizediff,即target的栈顶指针与当前esp的差值,并在平台相关代码内将esp偏移到对于的target->stack_start的位置。之后方法slp_restore_state 将会被调用,用于恢复ts_target的栈信息。

/*  此函数主要做下面两件操作

    1. 把target保存的栈数据拷贝到栈里面
    2. 对current向上回溯,找到第一个stack_stop > target->stack_stop的greenlet,并对target->stack_prev赋值
 */
static void GREENLET_NOINLINE(slp_restore_state)(void)
{
    ...
}

OK,其实greenlet的上下文切换都在这里了,唯一没有讲得就是slp_switch内部的汇编代码。每一个平台都不同,但主要是对esp的操作。

现在回到最初的函数g_switch,上面将到有两个分支,另一个分支是ts_target已经被激活了,逻辑上会直接调用g_switchstack函数,函数内的逻辑我们已经都讲到了。

reference:

  1. 说说这篇「我为什么从python转向go」- http://www.jianshu.com/p/xiQzpL
  2. Mac OS 10.9安装gevent - https://www.phodal.com/blog/mac-os-x-10-9-install-gevent-greenlet/
  3. GCC-Inline-Assembly-HOWTO - http://www.ibiblio.org/gferg/ldp/GCC-Inline-Assembly-HOWTO.html
  4. greenlet 实现原理 - https://code.google.com/p/libhjw/wiki/notes_on_greenlet
  5. 【Linux学习笔记】Linux C中内联汇编的语法格式及使用方法(Inline Assembly in Linux C)- http://blog.csdn.net/slvher/article/details/8864996

1.关于单元测试的思考

怎样对游戏进行单元测试是我在一直思考的问题,我们的游戏服务端是用Python写的,我们开发的测试方式确实很原始,服务端游戏逻辑写完后,都要借助于客户端链接上服务器,然后利用GM指令,或者其他蹩脚的方式才能测试服务端的逻辑是否正确,这个过程存在很多显而易见的弊端:

  1. 测试流程太过繁琐,并且测试点有可能不全面
  2. 没有规范的单元测试,自动化测试也很难实施
  3. 单元测试也是理解代码逻辑的一种方式,如果单元测试都没有,理解代码的渠道就少了一层

目前的困难:

  1. Python2.7 有自己的单元测试框架,可以好好的利用这个模块,怎样将此模块与游戏逻辑结合在一起也是要仔细考虑的问题
  2. 依赖于客户端的登录是因为一个模块的逻辑依赖于很多其他模块,使用客户端端登录,其他模块自然也就存在了,怎样摆脱客户端,完全在服务端进行单元测试也是一个难点
  3. 单元测试模块怎样构建,怎样可扩展

单元测试我也是最近开始研究,这篇博文不仅写给大家,也写给我自己。我也将会将自己对单元测试的理解写成一个系列,今天先来说说对Python Mock模块的理解。

2.Python Mock

Mock模块在Python3.3被加入到Python标准库中,如果Python2.x要使用Mock模块可以使用pip进行安装。

> sudo pip install mock

先罗列一下Mock模块的几个关键的内容:

  1. Mock类本身
  2. MagicMock
  3. patch修饰器

3.Mock

介绍Mock模块首先要说一下Mock类,Mock类的构造函数如下:

class Mock(spec=None, side_effect=None, return_value=DEFAULT, wraps=None, name=None, spec_set=None, unsafe=False, **kwargs)

个人比较关注如下几个参数:

spec

spec的参数可以是一个Python类。

>>> class Foo(object):
...     val_a = 1
...     def func_a(self):
...             print "call in func_a"
...
>>> m = Mock(spec = Foo)
>>> m.val_a
<Mock name='mock.val_a' id='4501041936'>
>>> m.func_a()
<Mock name='mock.func_a()' id='4501044176'>
>>> m.func_b()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "mock/mock.py", line 717, in __getattr__
raise AttributeError("Mock object has no attribute %r" % name)
AttributeError: Mock object has no attribute 'func_b'

Foo类有一个参数val_a和方法func_a,实例m调用val_afunc_a都没有问题,但是调用Foo没有的参数或者方法则会抛出AttributeError异常。

return_value

return_value定义mock被调用时的返回值。

>>> m = Mock(return_value = 1)
>>> m
<Mock id='4501997840'>
>>> m()
1

side_effect

side_effect和return_value正好相反,side_effect赋值后,mock的调用将会是side_effect的内容。side_effect可以是一个exception,也可以是一个可迭代的实例。

>>> li = [1,2,3]
>>> m = Mock(side_effect = li)
>>> m()
1
>>> m()
2
>>> m()
3
>>> m()
Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "mock/mock.py", line 1063, in __call__
        return _mock_self._mock_call(*args, **kwargs)
      File "mock/mock.py", line 1122, in _mock_call
        result = next(effect)
      File "mock/mock.py", line 127, in next
        return _next(obj)
StopIteration

Reference

  1. 使用 Python Mock 类进行单元测试 - http://www.oschina.net/translate/unit-testing-with-the-python-mock-class
  2. Python Mock Doc - https://docs.python.org/3/library/unittest.mock.html

起因

我们的游戏进程是用Python实现的,有时候为了调试一些游戏逻辑,我们不得不打印很多的log,并且有时候少打印了一些log还要补上这些log后重新走一遍游戏逻辑,甚是麻烦。
Python提供了pdb模块,是否可以随时调试一个Python进程的某段代码呢?

Python Pdb模块

我们先从Python的Pdb模块入手,如下是Pdb的构造函数。

class Pdb(bdb.Bdb, cmd.Cmd):

    def __init__(self, completekey='tab', stdin=None, stdout=None, skip=None):
        bdb.Bdb.__init__(self, skip=skip)
        cmd.Cmd.__init__(self, completekey, stdin, stdout)

Pdb的构造函数会传入stdinstdout,如果为None则是真标准输入输出。好了,我传入一个其他的文件描述符这不就可以将pdb的调试信息重定向了嘛,pdb模块完全可扩展,不需要改pdb了。

def runcall(*args, **kwds):
    return Pdb().runcall(*args, **kwds)

Pdb模块有runcall方法(其实runcall的实现是在bdb模块里面的),也好了,我们可以利用这个方法来实现debug一个函数了。

进程之间通信

我的目标是在同一台机器上实现两个进程之间的通信,服务端进程就是我们游戏的Game进程,客户端是一个Python程序。这里我使用了两个FIFO(有名管道)来实现了进程间的通信,参考了reference1。

#-*- coding:utf-8 -*-

"""\
    使用FIFO模拟一个双工管道用于两个进程之间的通信
"""

import os
import tempfile

__all__ = ["NamePipe"]

class NamePipe(object):

    def __init__(self, pid, is_client = True, mode = 0666):
        super(NamePipe, self).__init__()

        name = self._get_pipe_name(pid)
        self.in_name = name + ".in"
        self.out_name = name + ".out"

        try:
            os.mkfifo(self.in_name, mode)
            os.chmod(self.in_name, mode)
        except OSError: pass
        try : 
            os.mkfifo(self.out_name, mode)
            os.chmod(self.out_name, mode)
        except OSError: pass

        self.is_client = is_client

        if is_client:
            # client
            self.in_fd = open(self.in_name, "r")
            self.out_fd = open(self.out_name, "w")
        else:
            # server
            self.out_fd = open(self.in_name, "w")
            self.in_fd = open(self.out_name, "r")

    def write(self, msg):
        if self.is_open():
            self.out_fd.write("%d\n" % len(msg))
            self.out_fd.write(msg)
            self.out_fd.flush()
            return True
        else:
            return False

    def read(self):
        if self.is_open():
            sz = self.in_fd.readline()
            if sz:
                return self.in_fd.read(int(sz))
            else:
                return ""
        else:
            return None

    def flush(self):
        pass

    def readline(self):
        return self.read()

    def is_open(self):
        #return True
        return not (self.in_fd.closed or self.out_fd.closed)

    def close(self):
        """  只尝试unlink掉读端
        """
        if self.is_client:
            try: os.remove(self.in_name)
            except OSError: pass
        else:
            try: os.remove(self.out_name)
            except OSError: pass
        self.in_fd.close()
        self.out_fd.close()


    def _get_pipe_name(self, pid):
        return os.path.join(tempfile.gettempdir(), "pipe-%d" % pid)

Debug修饰器

针对要Debug得函数,我们可以使用修饰器进行修饰。

def remote_debug(func):
    def wrapper(*args, **kwargs):
        import pdb
        if _pipe:
            try:
                pdb.Pdb(stdin = _pipe, stdout = _pipe).runcall(func, *args, **kwargs)
            except IOError:
                _pipe.close()
                _pipe = None
        else:
            return func(*args, **kwargs)
    return wrapper    

建立通信

客户端通过SIGUSR2信号来通知服务端建立通信。

# server
_pipe = None

def remote_connect(sig, frame):
    if _pipe:
        _pipe.close()
    _pipe = NamePipe.NamePipe(os.getpid(), False)

def reg_listener():
    import signal
    signal.signal(signal.SIGUSR1, remote_connect)

-------------------------------------------------------

# clent
#-*- coding:utf-8 -*-

import signal
import os
import sys
import NamePipe
import pdb

Prefix = pdb.Pdb().prompt

pid = int(sys.argv[1])

os.kill(pid, signal.SIGUSR1)

pipe = NamePipe.NamePipe(pid, True)

while True:

    while True:
        txt = pipe.read()
        if txt:
            sys.stdout.write("%s" % txt)
            sys.stdout.flush()
            if txt.startswith(Prefix):
                break


    txt = raw_input("")
    pipe.write(txt)

reference:

  1. Debugging a running python process - http://code.activestate.com/recipes/576515/

从Python2.5开始

首先引用PEP342开篇的一段话

This PEP proposes some enhancements to the API and syntax of generators, to make them usable as simple coroutines.

在Python2.x,我们可以用Generator来实现Coroutine的功能。

def coroutine(func):
    def wrapper(*args, **kwargs):
        cr = func(*args, **kwargs)    
        try:
            cr.next()
        except StopIteration: pass
        return cr
    return wrapper

上面是一个coroutine的修饰器,如下是使用此修饰器的方法。

@coroutine
def grep(pattern):
     print "Looking for %s" % pattern
     while True:
     line = (yield)
     if pattern in line:
     print line,

 g = grep("hello")
 g.send("hello world")
 g.send("test hello")
 g.send("test world")

其实python在编译grep这段代码的时候,发现有yield关键字,func_code的co_flags会存在CO_GENERATOR标志位,因此在PyEval_EvalCodeEx执行的过程中会返回一个generator实例,而不是去执行这个函数的func_code。generator实例中会保存对应的PyFrameObject。

PyObject *
PyGen_New(PyFrameObject *f)
{
    PyGenObject *gen = PyObject_GC_New(PyGenObject, &PyGen_Type);
    ... ...
    gen->gi_frame = f;
    Py_INCREF(f->f_code);
    gen->gi_code = (PyObject *)(f->f_code);
    gen->gi_running = 0;
    gen->gi_weakreflist = NULL;
    _PyObject_GC_TRACK(gen);
    return (PyObject *)gen;
}

在generator的send方法中会继续运行此时保存的PyFrameObject。

static PyObject *
gen_send_ex(PyGenObject *gen, PyObject *arg, int exc)
{
    PyThreadState *tstate = PyThreadState_GET();
    PyFrameObject *f = gen->gi_frame;
    PyObject *result;
    ... ...
    result = PyEval_EvalFrameEx(f, exc);
    ... ...
    return result
}

Python3.3的yield from

Python3.3针对Generator加入了新的语法yield from,个人感觉新的语法可以让某些功能的实现更新简洁(其实就是少写点代码啊,,,),但真正本质上并没有改善Generator。这篇文章对yield from语法做了一定的解释。

Python3.5 async def

Python3.5引入了async def语法,定义一个coroutine。

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

当async def函数里面调用await的时候,这个coroutine将会被挂起(Suspended)。但看Python3.5的源码后发现,coroutine的内部实现和generator的内部实现基本相同。

# genobject.c
PyTypeObject PyCoro_Type = {
    PyVarObject_HEAD_INIT(&PyType_Type, 0)
    "coroutine",                                /* tp_name */
    ...
    coro_methods,                               /* tp_methods */
    ...
};

上面是coroutine的PyTypeObject的定义,其实一开始的示例代码中print_sum函数返回的正是一个coroutine实例(是不是跟generator很像呢?!)。我们在看一下coro_methods的定义。

static PyMethodDef coro_methods[] = {
    {"send",(PyCFunction)_PyGen_Send, METH_O, coro_send_doc},
    {"throw",(PyCFunction)gen_throw, METH_VARARGS, coro_throw_doc},
    {"close",(PyCFunction)gen_close, METH_NOARGS, coro_close_doc},
    {NULL, NULL}        /* Sentinel */
};

send方法的实现就是_PyGen_Send,和generator的实现一模一样,await后面可以紧跟一个新的coroutine实例,这个功能则归功于Python3.3开始出现的yield from语法。

references:

  1. PEP342 - https://www.python.org/dev/peps/pep-0342/
  2. PEP380 - https://docs.python.org/3/whatsnew/3.3.html#pep-380
  3. A Curious Course on Coroutines and Concurrency - http://www.dabeaz.com/coroutines/Coroutines.pdf

怎样实现一个Timer(计时器)。

1. libuv Timer

之前的文章里面介绍过libuv的Timer使用,在libuv中Timer超时的判断是在主循环中实现的。

# in core.c

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
    ... ...

      while (r != 0 && loop->stop_flag == 0) {
        uv__update_time(loop);    【1】
        uv__run_timers(loop);    【2】
        ... ...

        timeout = 0;
        if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
              timeout = uv_backend_timeout(loop);   【3】

        uv__io_poll(loop, timeout);   
        ... ...

  }

这里我只列出和Timer相关的一些逻辑。【1】处在每次循环的开始处,会重新获得当前时间,并将其赋值给loop->time。【2】处处理到期的Timer。【3】处会计算即将到期的Timer离现在时间还有多少,此时间将会作为poll的超时时间。

2. Libtask

Libtask里面没有完全意义上的Timer,这里分析的是taskdelay方法,通过taskdelay方法可以实现一个别样的Timer。

void
timertask(void* v) {
    taskdelay((int)v);  // 延迟v毫秒
    ... ...  // do what you will do
}

taskdelay不是真正意义上的Timer,但它的实现却和Timer的实现思路很相似,因此向拿出来分析一下。先看一下taskdelay的内部逻辑。

uint
taskdelay(uint ms)
{
    uvlong when, now;
    Task *t;

    if(!startedfdtask){
        startedfdtask = 1;
        // fdtask只创建一次
        taskcreate(fdtask, 0, 32768);    【1】
    }

    now = nsec();
    when = now+(uvlong)ms*1000000;
    // 找到第一个终止时间大于when的task
    for(t=sleeping.head; t!=nil && t->alarmtime < when; t=t->next)
        ;

    // 将当前task插入队列        【2】
    if(t){
        taskrunning->prev = t->prev;
        taskrunning->next = t;
    }else{
        taskrunning->prev = sleeping.tail;
        taskrunning->next = nil;
    }

    t = taskrunning;
    t->alarmtime = when;
    if(t->prev)
        t->prev->next = t;
    else
        sleeping.head = t;
    if(t->next)
        t->next->prev = t;
    else
        sleeping.tail = t;

    // 这是做什么?
    if(!t->system && sleepingcounted++ == 0)
        taskcount++;
    taskswitch();    【3return (nsec() - now)/1000000;
}

fdtask后面会介绍,其实是调度的主循环的task,如果fdtask没有创建,首先进行创建。之后我们会根据when将其插入sleeping队列中,sleeping使用列表来维护,最小堆当然是更好的选择。之后,在【3】处我们进行taskswitch,切换到fdtask。

void
fdtask(void *v)
{
    ... ...    
    for(;;){
        /* let everyone else run */
        // 让其他的Task先运行一下下
        while(taskyield() > 0)        【1】
            ;
        ... ...
        // 计算poll的超时时间
        if((t=sleeping.head) == nil)
            ms = -1;  // 如果sleeping队列为空,poll没有超时时间
        else{
            /* sleep at most 5s */
            now = nsec();
            if(now >= t->alarmtime)
                ms = 0;        // 如果已经有超时timer,poll立马返回
            else if(now+5*1000*1000*1000LL >= t->alarmtime)
                ms = (t->alarmtime - now)/1000000;
            else
                ms = 5000;
        }
        // ms 很重要,超时后有些时间到期的任务就可以触发起来了
        if(poll(pollfd, npollfd, ms) < 0){
            ... ...
        }

        ... ...

        now = nsec();
        // 满足sleep的任务可以触发了
        while((t=sleeping.head) && now >= t->alarmtime){
            deltask(&sleeping, t);
            if(!t->system && --sleepingcounted == 0)
                taskcount--;
            taskready(t);
        }
    }
}

一开始使用taskyield让其他在就绪列表的任务都提前运行一下。之后计算poll的超时时间。在最后将超时的timer对应的Task加入就绪列表。是不是和前面处理的Timer逻辑十分相似!

3.mudou

陈硕写的《Linux多线程服务端编程》一书中也提到了一种Timer的实现方法,感觉也是很巧妙的。
他使用timerfd_create(2)/timerfd_settime(2)/timerfdgettime(2),如下是陈硕对使用timerfd*的解释:

timerfd_create(2)把时间变成了一个文件描述符,该“文件”在定时器超时的那一刻变得可读,这样就能很方便地融入select(2)/poll(2)框架中,用统一的方式来处理IO事件和超时事件,这也正式Reactor模式的长处。

int createTimerfd()
{
      int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                             TFD_NONBLOCK | TFD_CLOEXEC);
      if (timerfd < 0)
      {
        LOG_SYSFATAL << "Failed in timerfd_create";
      }
      return timerfd;
}

通过timerfd_create创建一个文件描述符,将其加入到poll中,通过timerfd_settime方法来修改时间,但需要注意的是这个接口只适用于Linux系统下,并不通用。

4.总结

可以看出Timer的实现并不是很复杂,框架会利用poll(epoll/kqueue)来实现一个主tick,计算最先超时的Timer的时间作为poll的超时时间,当poll超时的时候会处理Timer队列的每一个计时器的内容。