学习东西一定要给自己找个相对复杂到能比较深入了解要学习的内容的项目。「知乎Live全文搜索」是我用来学习elasticsearch、elasticsearch_dsl、asyncio、sanic和微信小程序项目。我想做时知乎还不提供Live的搜索,现在虽然提供了一个入口,但是感觉搜索的效果我觉得并不好。

前面在公众号我已经发了很多相关的asyncio、elasticsearch_dsl和sanic的内容,前面那些全部是给从今天开始的内容做铺垫。下面是这样文章的快速链接,如果你没看过或者忘记了可以回顾一下:

  1. 使用Python进行并发编程-asyncio篇(一)
  2. 使用Python进行并发编程-asyncio篇(二)
  3. 使用Python进行并发编程-asyncio篇(三)
  4. 使用API登录知乎并获得token
  5. 知乎Live全文搜索之模型设计和爬虫实现

现在列一下需求清单,并挨个完成。这里插一句,需求是否清晰明确是任务按期完成的最重要的条件之一,所以细化需求能力是每一个工程师都要熟悉和掌握的,这要求工程师对业务和所需要的技术很熟悉。

现在我先大致描绘要做的4件事情:

  • 继续完善爬虫。主要是以下2点:

    1. 之前分享过的爬虫的协程带来并发速度的提升优势还没有完全发挥出来。
    2. 之前的爬虫是纯信息抓取,没有考虑数据的可视化(比如没有考虑Live背景图,用户头像,Live所属话题信息等),需要增加字段并重新抓取。
  • 让elasticsearch_dsl支持asyncio,虽然官方提供了elasticsearch-py-async这个包,但是没有对elasticsearch_dsl的异步封装,而且由于我们将要使用sanic这个使用了第三方uvloop的Web框架,不能通过asyncio.get_event_loop的方式获取到loop而给elasticsearch_dsl用,需要处理。

  • 完成微信小程序需要的数据model接口。
  • 实现一个RESTful的sanic API功能,让elasticsearch和sqlalchemy的返回内容能非常方便的通用处理。

当然也并不是能预先清晰制定好100%需求,因为会有一些外部不可控的因素或者不小心踩了个深坑儿。比如一开始我只是抓知乎的图片链接,但是后来发现在微信小程序开发工具中这些图片资源是不能访问的(可能是没有Referfer),所以后来又改成把图片都下载到本地。另外一个是当时存储用户用了SQLite,现在要存话题数据,发现SQLite不支持并发,这对asyncio来说是不合理的选择,所以后来换成了MySQL。看,这就是选择sqlallchemy的好处,我完全不用改model,只是把DB_URI改一下就好了。

本文是上面的第二项,也就是让elasticsearch_dsl支持asyncio。我并且尽量拆分任务成可以团队多成员可以同时协作完成,也就是不用等着A做完B才能开始去做。我继续细节需求, 并挨个完成

让sanic暴露loop给elasticsearch_dsl用

通过看sanic和elasticsearch_dsl的源码,我用sanic提供的before_start事件就可以完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from sanic import Sanic
from elasticsearch_dsl.connections import connections
app = Sanic(__name__)
def set_loop(sanic, loop):
conns = connections._conns
for c in conns:
conns[c].transport.loop = loop
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8300,
before_start=[set_loop], workers=4, debug=True)

elasticsearch创建链接后会保存在connections._conns,在Sanic启动后把这些链接中用的loop替换成sanic创建那个就可以了。

让elasticsearch-py-async支持Sanic

我fork了elasticsearch-py-async,虽然它已经是支持asyncio的了,但是还不支持外部loop,我的修改具体的可以看这里。说白了也是为了成替换sanic创建的那个loop。

elasticsearch-py-async加载的时候loop已经创建了,所以需要hack一下,让main_loop使用我设置的那个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from elasticsearch import Transport
class AsyncTransport(Transport):
...
@asyncio.coroutine
def main_loop(self, method, url, params, body, ignore=(), timeout=None):
for attempt in range(self.max_retries + 1):
connection = self.get_connection()
connection.loop = self.loop
try:
status, headers, data = yield from connection.perform_request(
method, url, params, body, ignore=ignore, timeout=timeout)
except TransportError as e:
if method == 'HEAD' and e.status_code == 404:
return False
...

@asyncio.coroutineyield from是老式用法了,我新写的代码都统一都改成Python 3.5增加的async/await关键字了。

其中我加了一句connection.loop = self.loop,这样之前做的conns[c].transport.loop = loop就有意义的。另外一个问题是aiohttp.session的用法,会造成在类初始化的时候就生成了self.session被未来使用,现在需要换成我们的loop,就需要让self.session在用时才生成,而且类初始化时候就创建session是一种不好的用法,我在多个地方见过,包含之前提到的500lines项目中的爬虫也是这样用的,本来是这样用的:

1
2
3
4
5
6
7
8
class Crawler:
def __init__(self, roots,
exclude=None, strict=True, # What to crawl.
max_redirect=10, max_tries=4, # Per-url limits.
max_tasks=10, *, loop=None):
...
self.session = aiohttp.ClientSession(loop=self.loop)
...

现在aiohttp会抛Creating a client session outside of coroutine的警告,也就是还没有开始协程就创建了。好的用法应该是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Crawler:
def __init__(self, roots,
exclude=None, strict=True, # What to crawl.
max_redirect=10, max_tries=4, # Per-url limits.
max_tasks=10, *, loop=None):
...
self._session = None
@property
def session(self):
if self._session is None:
self.session = aiohttp.ClientSession(loop=self.loop)
self._session = session
return self._session

这种编程方式使用非常广泛,体会一下。

让elasticsearch-dsl-py支持Sanic

前面说到elasticsearch-dsl-py底层用的是elasticsearch-py,并不支持异步化。所以我还是fork了一份,目前除了为支持Sanic,还修了一个我认为的BUG,已经提了PR还没有被处理, 有兴趣的同学可以移步Fix IllegalOperation when use custom analyzer

我发现有的同学很惧怕Python 3内置的协程方案asyncio。我想原因主要是2点:

  1. 对async/await这种异步编程方式不习惯
  2. asyncio的生态还不够丰富,非常有可能你要自己去封装还不支持的客户端

人都是习惯在自己的舒适区,但其实可能没那么难习惯。这个就像成天用python写代码,突然要写个shell脚本或者就像没做过组件式开发第一次用React一样的不适应。这个时候需要你忍着不习惯,坚持的做1-2个项目,等你了解了,熟悉了,就会发现新的编程方式其实也就是那么回事,但是不去尝试和深入永远也学不会。

大家没事可以翻翻aio-libs这个汇集了多个常用工具的asyncio的封装库,其实不复杂,把一个同步的程序改成asyncio的,我总结就是一句话:

给内部有异步操作的函数添加async关键字,在有网络请求和I/O操作并且希望协程化的地方添加await关键字

对于elasticsearch-dsl-py的修改,差不多就是一种模式。举个get方法的例子。原来是这样写的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@add_metaclass(DocTypeMeta)
class DocType(ObjectBase):
...
@classmethod
def get(cls, id, using=None, index=None, **kwargs):
es = connections.get_connection(using or cls._doc_type.using)
doc = es.get(
index=index or cls._doc_type.index,
doc_type=cls._doc_type.name,
id=id,
**kwargs
)
if not doc['found']:
return None
return cls.from_es(doc)

现在改成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@add_metaclass(DocTypeMeta)
class DocType(ObjectBase):
...
@classmethod
async def get(cls, id, using=None, index=None, **kwargs):
es = connections.get_connection(using or cls._doc_type.using)
doc = await es.get(
index=index or cls._doc_type.index,
doc_type=cls._doc_type.name,
id=id,
**kwargs
)
if not doc['found']:
return None
return cls.from_es(doc)

所以不是每一步都需要加await,只有es.get是有网络请求的,其他的地方加了也没有意义。

另外的一个地方是把初始化Elasticsearch对象的地方改成elasticsearch-py-async的封装后的类,原来是:

1
2
3
4
5
6
7
from elasticsearch import Elasticsearch
class Connections(object):
def create_connection(self, alias='default', **kwargs):
kwargs.setdefault('serializer', serializer)
conn = self._conns[alias] = Elasticsearch(**kwargs)
return conn

改成:

1
2
3
4
5
6
7
from elasticsearch_async import AsyncElasticsearch
class Connections(object):
def create_connection(self, alias='default', **kwargs):
kwargs.setdefault('serializer', serializer)
conn = self._conns[alias] = AsyncElasticsearch(**kwargs)
return conn

明天我将分享完善后的爬虫,以及第三项「完成微信小程序需要的数据model接口」。