我曾经在一些公众场合说过心中的优秀 Python 开发者。Flask 和 Requests 的作者就不说了,21 世纪最缺的就是 idea,他们不仅有而且还都用非常优美的方式做出来了。另外我还提到了 Celery 作者 Ask Solem,并不是因为 Celery 很有名它的主要作者就优秀了,我对 ask 的欣赏,完全是看 Celery 及其相关依赖的源代码的时候产生的。

有多年后台开发的工程师想必清楚,Celery 本身涉及到的技术点其实在业界应用是很广泛的。Celery 能这么流行,我们先排除没有进行技术深入下的盲从,和它诞生的非常早以外,我认为这和项目的内部设计的非常好也是有关的。

接下来的几篇文章我将分析 Celery 使用的 Kombu 库中的一些设计实现让大家对这个优秀项目更了解,并从中学习可扩展开发的实践。

Kombu 是什么?

当一个项目变得越来越复杂,就要考虑只保留核心,并把其他部分分拆到不同的项目中以便减少未来的维护和开发的成本。Flask、IPython 都是这样做的。

Kombu 是一个把消息传递封装成统一接口的库。 Celery 一开始先支持的 RabbitMQ,也就是使用 AMQ 协议。由于要支持越来越多的消息代理,但是这些消息代理是不支持 AMQ 协议的,需要一个东西把所有的消息代理的处理方式统一起来,甚至可以理解为把它们「伪装成支持 AMQ 协议」。Kombu 的最初的实现叫做 carrot, 后来经过重构才成了 Kombu。

registry

registry 也就是「注册」,有按需加入的意思,在 Python 标准库和一些优秀开源项目中都有应用。我们先看个 django 的 场景 ,为了减少篇幅我没有列出 CheckRegistry 类中其他方法:

### source code start
from itertools import chain


class CheckRegistry:

    def __init__(self):
        self.registered_checks = []
        self.deployment_checks = []

    def register(self, check=None, *tags, **kwargs):
        kwargs.setdefault('deploy', False)

        def inner(check):
            check.tags = tags
            if kwargs['deploy']:
                if check not in self.deployment_checks:
                    self.deployment_checks.append(check)
            elif check not in self.registered_checks:
                self.registered_checks.append(check)
            return check

        if callable(check):
            return inner(check)
        else:
            if check:
                tags += (check, )
            return inner

    def tag_exists(self, tag, include_deployment_checks=False):
        return tag in self.tags_available(include_deployment_checks)

    def tags_available(self, deployment_checks=False):
        return set(chain(*[check.tags for check in self.get_checks(deployment_checks) if hasattr(check, 'tags')]))

    def get_checks(self, include_deployment_checks=False):
        checks = list(self.registered_checks)
        if include_deployment_checks:
            checks.extend(self.deployment_checks)
        return checks


registry = CheckRegistry()
register = registry.register
tag_exists = registry.tag_exists

### source code end
@register('mytag', 'another_tag')
def my_check(apps, **kwargs):
    pass


print tag_exists('another_tag')
print tag_exists('not_exists_tag')

可以看到每次用 registry.register 都能动态的添加新的 tag,最后还用register = registry.register这样的方式列了个别名。执行结果如下:

❯ python django_example.py
True
False

kombu 库包含对消息的序列化和反序列化工作的实现,可以同时支持多种序列化方案,如 pickle、json、yaml 和 msgpack。假如你从前没有写过这样可扩展的项目,可能想的是每种的方案的 loads 和 dumps 都封装一遍,然后用一个大的 if/elif/else 来控制最后的序列化如何执行。

那么在 kombu 里面是怎么用的呢?我简化下它的 实现

import codecs
from collections import namedtuple

codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder'))

class SerializerNotInstalled(Exception):
    pass


class SerializerRegistry(object):
    def __init__(self):
        self._encoders = {}
        self._decoders = {}
        self._default_encode = None
        self._default_content_type = None
        self._default_content_encoding = None

    def register(self, name, encoder, decoder, content_type,
                 content_encoding='utf-8'):
        if encoder:
            self._encoders[name] = codec(
                content_type, content_encoding, encoder,
            )
        if decoder:
            self._decoders[content_type] = decoder

    def _set_default_serializer(self, name):
        try:
            (self._default_content_type, self._default_content_encoding,
             self._default_encode) = self._encoders[name]
        except KeyError:
            raise SerializerNotInstalled(
                'No encoder installed for {0}'.format(name))

    def dumps(self, data, serializer=None):
        if serializer and not self._encoders.get(serializer):
            raise SerializerNotInstalled(
                'No encoder installed for {0}'.format(serializer))

        if not serializer and isinstance(data, unicode):
            payload = data.encode('utf-8')
            return 'text/plain', 'utf-8', payload

        if serializer:
            content_type, content_encoding, encoder = \
                self._encoders[serializer]
        else:
            encoder = self._default_encode
            content_type = self._default_content_type
            content_encoding = self._default_content_encoding

        payload = encoder(data)
        return content_type, content_encoding, payload

    def loads(self, data, content_type, content_encoding):
        content_type = (content_type if content_type
                        else 'application/data')
        content_encoding = (content_encoding or 'utf-8').lower()

        if data:
            decode = self._decoders.get(content_type)
            if decode:
                return decode(data)
        return data


registry = SerializerRegistry()
dumps = registry.dumps
loads = registry.loads
register = registry.register

其实 kombu 还实现了 unregister 限于篇幅我就不展开了。现在我们想添加 yaml 的支持,只需要加这样一个函数:

def register_yaml():
    try:
        import yaml
        registry.register('yaml', yaml.safe_dump, yaml.safe_load,
                          content_type='application/x-yaml',
                          content_encoding='utf-8')
    except ImportError:

        def not_available(*args, **kwargs):
            """Raise SerializerNotInstalled.
            Used in case a client receives a yaml message, but yaml
            isn't installed.
            """
            raise SerializerNotInstalled(
                'No decoder installed for YAML. Install the PyYAML library')
        registry.register('yaml', None, not_available, 'application/x-yaml')


register_yaml()

这样就支持 yaml 了。如果希望默认使用 yaml 来序列化,可以执行:

registry._set_default_serializer('yaml')

是不是非常好扩展,如果哪天我希望去掉对 pickle (安全问题),就可以直接注释对应的函数就好了。写个小例子试验下:

yaml_data = """\
float: 3.1415926500000002
int: 10
list: [george, jerry, elaine, cosmo]
string: The quick brown fox jumps over the lazy dog
unicode: "Th\\xE9 quick brown fox jumps over th\\xE9 lazy dog"
"""

content_type, content_encoding, payload = dumps(yaml_data, serializer='yaml')
print content_type, content_encoding

assert loads(payload, content_type=content_type, content_encoding=content_encoding) == yaml_data

运行的结果就是:

❯ python kombu_example.py
application/x-yaml utf-8

entrypoint

在我的书里面介绍过如果使用标准库自带的 pkg_resources.iter_entry_points 实现一个简单的插件系统。这在 kombu 上面也有应用,在序列化实现模块的最后加了这么几句:

from pkg_resources import iter_entry_points

for ep in iter_entry_points('kombu.serializers'):
    args = ep.load()
    register(ep.name, *args)

这是什么东西呢?pkg_resources 是一个用于包发现和资源访问的模块,我们可以实现不同的 kombu 扩展,如果在这个扩展项目的 setup.py 里面设置对应的 entry_points,在安装之后,运行上述代码的时候就会自动找到这些扩展,并注册进来。这就是一个扩展系统。Flake8 就是最好的这个扩展玩法的范例。

kombu 的扩展不多,我选择 kombu-fernet-serializers 来进行介绍。首先看一下它的 setup.py 文件:

...
    entry_points={
        'kombu.serializers': [
            'fernet_json = kombu_fernet.serializers.json:register_args',
            'fernet_yaml = kombu_fernet.serializers.yaml:register_args',
            'fernet_pickle = kombu_fernet.serializers.pickle:register_args',
            'fernet_msgpack = kombu_fernet.serializers.msgpack:register_args',
        ]
    }
...

注意到了吧,这个 entry 点就是 kombu.serializers,安装之后就多了 4 个序列化方案,我们看一下 fernet_json 的实现:

import anyjson as _json

from . import fernet_encode, fernet_decode

MIMETYPE = 'application/x-fernet-json'

register_args = (
    fernet_encode(_json.dumps),
    fernet_decode(_json.loads),
    MIMETYPE,
    'utf-8',
)

而 fernet_yaml 也被放进了模块的方式,其实和在函数内殊途同归:

from kombu.exceptions import SerializerNotInstalled

from . import fernet_encode, fernet_decode

try:
    import yaml
except ImportError:
    def not_available(*args, **kwargs):
        """In case a client receives a yaml message, but yaml
        isn't installed."""
        raise SerializerNotInstalled(
            'No decoder installed for YAML. Install the PyYAML library')

    yaml_encoder = not_available
    yaml_decoder = None
else:
    yaml_encoder = yaml.safe_dump
    yaml_decoder = yaml.safe_load

MIMETYPE = 'application/x-fernet-yaml'

register_args = (
    fernet_encode(yaml_encoder),
    fernet_decode(yaml_decoder) if yaml_decoder else None,
    MIMETYPE,
    'utf-8',
)

事实上,我们并不需要了解 fernet_encode 和 fernet_decode 是如何对消息做对称加密的,只是感受下这样添加扩展的方式是不是很优雅呢?