Celery是一个专注于实时处理和任务调度分布式任务队列。通过RabbitMQ、Redis、MongoDB等消息代理,把任务发给执行任务的Worker以达到异步执行。

我写的那本《Python Web开发实战》的样章就是《使用Celery》,建议看下面内容之前先读一下这篇文章。

接下来的内容假设你已经对Celery有了一定的了解。对wechat-admin项目来说,使用Celery要做如下事情:

  1. 更新项目数据库中的联系人、群聊和公众号等相关内容
  2. 监听wxpy进程,处理自动加群、接受消息、踢人以及各种插件功能等
  3. 自动重启上述的监听进程
  4. 发送新消息数量提醒

首先我们创建一个目录(wechat),专门用来存放celery任务相关的内容,目录下文件列表如下:

1
2
3
4
5
6
7
8
❯ tree wechat
wechat
├── __init__.py
├── celery.py # 名为celery.py是主程序,启动的时候可以直接`celery -A wechat worker -l info -B`
├── celeryconfig.py # 配置文件
└── tasks.py # 存放任务逻辑
0 directories, 4 files

我们挨个看看

celeryconfig.py

看文件名字就知道了,这个是放配置的文件:

1
2
3
4
5
6
7
8
❯ cat celeryconfig.py
from config import REDIS_URL
BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']

指定消息代理和执行结果都使用Redis,任务(消息)使用msgpack序列化,结果使用json序列化,任务结果保存时间24小时等

celery.py

主程序有点Flask的app.py的感觉:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
❯ cat celery.py
from celery import Celery
from celery.signals import worker_ready
from models.redis import db, LISTENER_TASK_KEY
app = Celery('wechat', include=['wechat.tasks'])
app.config_from_object('wechat.celeryconfig')
@worker_ready.connect
def at_start(sender, **k):
with sender.app.connection() as conn: # noqa
task_id = sender.app.send_task('wechat.tasks.listener')
db.set(LISTENER_TASK_KEY, task_id)
if __name__ == '__main__':
app.start()

这段代码有2点需要解释一下:

  1. 调用send_task会返回任务id,存在LISTENER_TASK_KEY里面用于未来重启时直接通过这个任务id
  2. 使用了Celery的信号系统,listener这个异步任务需要在worker启动之后就运行,使用worker_ready这个信号就可以。

tasks.py

tasks.py这个文件包含了很多业务逻辑,为了演示我省略部分代码。不过代码还是很长,所以我直接在对应行数的代码上加注释来解释了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
❯ cat tasks.py
from datetime import timedelta
from celery.task import periodic_task
from celery.task.control import revoke
from wechat.celery import app
from wxpy.exceptions import ResponseError
from itchat.signals import logged_out
def restart_listener(sender, **kw):
# 重启tasks.listener这个任务
task_id = r.get(LISTENER_TASK_KEY)
if task_id:
revoke(str(task_id, 'utf-8'))
task_id = app.send_task('wechat.tasks.listener')
r.set(LISTENER_TASK_KEY, task_id)
logged_out.connect(restart_listener)
from wxpy.signals import stopped
from libs.wx import get_bot
from views.api import json_api
from models.redis import db as r, LISTENER_TASK_KEY
from app import app as sse_api
stopped.connect(restart_listener)
bot = get_bot()
def _retrieve_data(update=False):
_update_contact(bot, update)
_update_group(bot, update)
_update_mp(bot, update)
@app.task
def listener():
# 不用全局的bot,因为在import listener的过程中会
# 注册各种函数(处理自动加群、接受消息、踢人以及各种插件功能)
from libs.listener import bot
with json_api.app_context():
bot.join()
@app.task
def retrieve_data():
# 使用Flask应用中的方法都需要放在对应的上下文内
with json_api.app_context():
_retrieve_data(True)
@app.task
def update_contact(update=False):
# 都是业务逻辑,就省略了,这样分开写是可以单独的更新一种类型的数据
...
@app.task
def update_group(update=False):
...
@app.task
def update_mp(update=False):
...
# periodic_task就是定时任务,表示周期性的执行某任务
@periodic_task(run_every=timedelta(seconds=60), time_limit=5)
def send_notify():
# 发送新消息数量提醒
...

上一篇我说SSE的时候忘说了一点,就是更新消息提醒。在Web页面标记已读的时候,会POST到/readall接口,后端清空新通知数量。这是由于SSE的单向特点造成的,如果使用socketio(WebSocket)的话可以直接emit到后端,就不用HTTP这种方案了