Celery 是一个专注于实时处理和任务调度分布式任务队列。通过 RabbitMQ、Redis、MongoDB 等消息代理,把任务发给执行任务的 Worker 以达到异步执行。

我写的那本《Python Web 开发实战》的样章就是 《使用 Celery》 ,建议看下面内容之前先读一下这篇文章。

接下来的内容假设你已经对 Celery 有了一定的了解。对 wechat-admin 项目来说,使用 Celery 要做如下事情:

  1. 更新项目数据库中的联系人、群聊和公众号等相关内容
  2. 监听 wxpy 进程,处理自动加群、接受消息、踢人以及各种插件功能等
  3. 自动重启上述的监听进程
  4. 发送新消息数量提醒

首先我们创建一个目录(wechat),专门用来存放 celery 任务相关的内容,目录下文件列表如下:

❯ tree wechat
wechat
├── __init__.py
├── celery.py  # 名为celery.py是主程序,启动的时候可以直接`celery -A wechat worker -l info -B`
├── celeryconfig.py  # 配置文件
└── tasks.py  # 存放任务逻辑

0 directories, 4 files

我们挨个看看

celeryconfig.py

看文件名字就知道了,这个是放配置的文件:

 cat celeryconfig.py
from config import REDIS_URL
BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']

指定消息代理和执行结果都使用 Redis,任务(消息)使用 msgpack 序列化,结果使用 json 序列化,任务结果保存时间 24 小时等

celery.py

主程序有点 Flask 的 app.py 的感觉:

 cat celery.py
from celery import Celery
from celery.signals import worker_ready

from models.redis import db, LISTENER_TASK_KEY

app = Celery('wechat', include=['wechat.tasks'])
app.config_from_object('wechat.celeryconfig')


@worker_ready.connect
def at_start(sender, **k):
    with sender.app.connection() as conn:  # noqa
        task_id = sender.app.send_task('wechat.tasks.listener')
        db.set(LISTENER_TASK_KEY, task_id)


if __name__ == '__main__':
    app.start()

这段代码有 2 点需要解释一下:

  1. 调用 send_task 会返回任务 id,存在 LISTENER_TASK_KEY 里面用于未来重启时直接通过这个任务 id
  2. 使用了 Celery 的信号系统,listener 这个异步任务需要在 worker 启动之后就运行,使用 worker_ready 这个信号就可以。

tasks.py

tasks.py 这个文件包含了很多业务逻辑,为了演示我省略部分代码。不过代码还是很长,所以我直接在对应行数的代码上加注释来解释了:

 cat tasks.py
from datetime import timedelta
from celery.task import periodic_task
from celery.task.control import revoke

from wechat.celery import app
from wxpy.exceptions import ResponseError
from itchat.signals import logged_out


def restart_listener(sender, **kw):
    # 重启tasks.listener这个任务
    task_id = r.get(LISTENER_TASK_KEY)
    if task_id:
        revoke(str(task_id, 'utf-8'))
    task_id = app.send_task('wechat.tasks.listener')
    r.set(LISTENER_TASK_KEY, task_id)


logged_out.connect(restart_listener)

from wxpy.signals import stopped
from libs.wx import get_bot
from views.api import json_api
from models.redis import db as r, LISTENER_TASK_KEY
from app import app as sse_api

stopped.connect(restart_listener)
bot = get_bot()


def _retrieve_data(update=False):
    _update_contact(bot, update)
    _update_group(bot, update)
    _update_mp(bot, update)


@app.task
def listener():
    # 不用全局的bot,因为在import listener的过程中会
    # 注册各种函数(处理自动加群、接受消息、踢人以及各种插件功能)
    from libs.listener import bot
    with json_api.app_context():
        bot.join()


@app.task
def retrieve_data():
    # 使用Flask应用中的方法都需要放在对应的上下文内
    with json_api.app_context():
        _retrieve_data(True)


@app.task
def update_contact(update=False):
    # 都是业务逻辑,就省略了,这样分开写是可以单独的更新一种类型的数据
    ...


@app.task
def update_group(update=False):
    ...


@app.task
def update_mp(update=False):
    ...


# periodic_task就是定时任务,表示周期性的执行某任务
@periodic_task(run_every=timedelta(seconds=60), time_limit=5)
def send_notify():
    # 发送新消息数量提醒
    ...

上一篇我说 SSE 的时候忘说了一点,就是更新消息提醒。在 Web 页面标记已读的时候,会 POST 到 /readall 接口,后端清空新通知数量。这是由于 SSE 的单向特点造成的,如果使用 socketio (WebSocket) 的话可以直接 emit 到后端,就不用 HTTP 这种方案了