在上一篇项目设计中,我说到了SSE(Server-Sent Events)是为了实现单方向的消息推送,今天介绍下实际的使用。

我直接用了现成的Flask-SSE,其实SSE实现的原理比较简单:

  1. 借用Redis的发布/订阅模式创建一个方法,方法内会调用pubsub.listen监听新的发布数据。
  2. 使用Flask提供的stream_with_context,不断的从上面的方法中获取数据。

使用起来分2部分

前端

在前端页面添加一个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
function eventSourceListener() {
let source = new EventSource(`${API_URL}/stream`);
let self = this;
source.addEventListener('login', function(event) {
let data = JSON.parse(event.data);
if (data.type == 'scan_qr_code') {
self.uuid = data.uuid;
self.qrCode = `data:image/png;base64,${data.extra}`;
} else if (data.type == 'confirm_login') {
self.sub_title = 'Scan successful';
self.sub_desc = 'Confirm login on mobile WeChat';
self.qrCode = data.extra;
} else if (data.type == 'logged_in') {
sessionStorage.setItem('user', JSON.stringify(data.user));
self.$router.push({ path: '/main' });
} else if (data.type == 'logged_out') {
sessionStorage.removeItem('user');
self.$router.push('/login');
}
}, false);
source.addEventListener('notification', function(event) {
let data = JSON.parse(event.data);
self.notificationCount = data.count;
}, false);
source.addEventListener('error', function(event) {
console.log("Failed to connect to event stream");
}, false);
}

这段代码放在一个自定义的Vue的插件里面,这样在所有页面上都要自动包含这部分代码了。source.addEventListener用来添加事件监听,它监听了3种类型的消息:

  1. login 登录,也就是在页面反映当前微信的登录状态(等待扫码/扫码完成等待确认/确认完成)。不同的消息会执行不同的操作,页面也会立刻渲染出最新的结果。
  2. notification 消息提醒,会有一个异步任务定期检查新入库的消息,有新的消息就是发布出来通知新消息数。
  3. error 内置的错误消息,当然这个加不加倒还好

另外在登陆后执行sessionStorage.setItem('user', JSON.stringify(data.user));会设置浏览器的session,下次自动登录后右侧就显示头像了,这样能减少后端的负担,退出时removeItem方法再删掉。

后端

后端包含2部分,第一部分是用Flask实现上面说的${API_URL}/stream这个接口,这是一个长连接,消息就是从这里推送出去的。由于第一部分是阻塞的,我们需要异步的方式往这个阻塞进程里面推送数据,也就是开头说的,利用Redis的发布/订阅模式发布消息。比如通知调用起来是这样的

1
2
3
4
from app import app as sse_api
with sse_api.app_context():
sse.publish({'count': count}, type='notification')

登陆过程要复杂一些,我之前说过在我fork的ItChat和wxpy分支里面添加了信号的支持,这个信号是需要「注册」的,也就是在import之前就要注册,效果要类似这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from itchat.signals import scan_qr_code, confirm_login, logged_out
def publish(uuid, **kw):
from app import app
with app.app_context():
params = {'uuid': uuid, 'extra': kw.pop('extra', None),
'type': kw.pop('type', None)}
params.update(kw)
sse.publish(params, type='login')
scan_qr_code.connect(publish)
confirm_login.connect(publish)
logged_out.connect(publish)
from wxpy import * # noqa

这里用了信号的connect方法。举个logged_out的例子,在ItChat里面,首先定义这个信号

1
2
3
4
from blinker import Namespace
_signals = Namespace()
logged_out = _signals.signal('logged-out')

需要在对应发信号的地方调用send方法

1
logged_out.send(self.uuid, type='logged_out')

另外有个坑儿,首次打开Web页面的是一个铺满div的gif图片,一开始设想的是在下载二维码图片之后,通过修改img的src属性指到这个图片,实际开发中发现,这个二维码图片被会更新不及时,会使用缓存的就图片所以发送信号的时候不使用图片HTTP地址,而是Data URLs,这就需要把图片内容编码一下:

1
2
encoded = base64.b64encode(qrStorage.getvalue()).decode('ascii')
scan_qr_code.send(self.uuid, extra=encoded, type='scan_qr_code')

结语

这样借助Redis和Celery就实现了SSE的使用,下一节我将介绍Celery的使用。