看这篇文章前推荐阅读相关的如下文章:

  1. 知乎Live全文搜索之模型设计和爬虫实现
  2. 知乎Live全文搜索之使用Elasticsearch做聚合分析
  3. 知乎Live全文搜索之使用Elasticsearch做搜索建议
  4. 知乎Live全文搜索之让elasticsearch_dsl支持asyncio

知乎Live全文搜索之让elasticsearch_dsl支持asyncio 一文中,我把后端工作分成了4步,今天是完成爬虫和模型接口这2步,接口返回的数据会被微信小程序使用。

详细的列一下接口需求:

  1. 搜索。搜索符合输入的关键字的Live和用户,按照之前提到的各种策略排序,也支持通过status状态过滤「已结束」和「未结束」2种类型的Live。支持分页。
  2. 搜索建议。提供符合输入的关键字的Live的建议。
  3. 发现。把全部的Live按照之前提到的各种策略排序,可以通过各种字段排序,可以选择Live开始的时间范围(默认是全部)。
  4. 获取热门话题。
  5. 获取某话题详细信息及话题下的Live,支持分页、排序、时间范围。
  6. 获取全部用户,并且可以按照举办的Live数量、更新Live时间等条件排序。
  7. 获取单个用户信息。
  8. 根据各种策略排序,获取7天热门Live,非知乎排序。
  9. 根据各种策略排序,获取30天热门Live,非知乎排序。

添加Topic模型

由于4和5的需求,我添加了Topic这个模型,上篇文章说过SQLite不支持并发,所以替换成了MySQL,要把config里面的DB_URI改成如下格式:

1
DB_URI = 'mysql+pymysql://localhost/test?charset=utf8mb4'

其中test是库的名字,charset要用utf8mb4,因为有些用户信息甚至Live的标题里面包含emoji。MySQL的客户端用的是PyMySQL,需要在schema上指出来。

Topic类和之前的User格式差不多,只是不同的字段,限于篇幅就不列出来了。

实现用户相关方法

为了实现可以按照举办的Live数量、更新Live时间排序,我添加了2个字段,也改了字符集:

1
2
3
4
5
6
7
8
9
10
11
12
13
from config import SUGGEST_USER_LIMIT, PEOPLE_URL, LIVE_USER_URL
class User(Base):
__tablename__ = 'users'
__table_args__ = {
'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8mb4'
}
...
live_count = Column(Integer, default=0)
updated_time = Column(DateTime, default=datetime.now)

接着添加一些必要的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class User(Base):
...
def incr_live_count(self):
self.live_count += 1
session.commit()
@property
def url(self):
return PEOPLE_URL.format(self.speaker_id)
@property
def lives_url(self):
return LIVE_USER_URL.format(self.speaker_id)
def to_dict(self):
d = {c.name: getattr(self, c.name, None)
for c in self.__table__.columns}
d.update({
'type': 'user',
'url': self.url,
'lives_url': self.lives_url
})
return d

我习惯给model添加一个to_dict方法,把需要的字段和值拼成一个dict返回。当然有些API实际不需要这么多的字段,在下一篇中我会介绍怎么处理schema的问题。

最后是3个接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class User(Base):
@classmethod
def add(cls, **kwargs):
speaker_id = kwargs.get('speaker_id', None)
r = None
if id is not None:
q = session.query(cls).filter_by(speaker_id=speaker_id)
r = q.first()
if r:
q.update(kwargs)
if r is None:
r = cls(**kwargs)
session.add(r)
try:
session.commit()
except:
session.rollback()
else:
return r
@classmethod
def suggest(cls, q, start=0, limit=SUGGEST_USER_LIMIT):
query = session.query(User)
users = query.filter(User.name.like('%{}%'.format(q))).offset(
start).limit(limit).all()
return [user.to_dict() for user in users]
@classmethod
def get_all(cls, order_by='id', start=0, limit=10, desc=False):
'''
:param order_by: One of ``'id'``, ``'live_count'`` or
``'updated_time'``
'''
query = session.query(User)
order_by = getattr(User, order_by)
if desc:
order_by = _desc(order_by)
users = query.order_by(order_by).offset(start).limit(limit).all()
return [user.to_dict() for user in users]

需要注意add方法,其实叫做add_or_update更合适,需要使用session一定要commit才能提交数据。

sqlalchemy没有自带的suggest功能,只能用Like来实现。get_all方法就是上面第6个需求接口。

完成Live模型字段

首先道歉,之前我理解的自定义analyzer的用法是错误的,下面的才是正确的姿势:

1
2
3
4
5
6
from elasticsearch_dsl.analysis import CustomAnalyzer
ik_analyzer = CustomAnalyzer(
'ik_analyzer', tokenizer='ik_max_word',
filter=['lowercase']
)

tokenizer字段是必选的,这里使用ik分词插件提供的ik_max_word。我还给Live添加了2个字段:

1
2
3
class Live(DocType):
cover = Text(index='not_analyzed') # 对应专栏头图(如果有)
zhuanlan_url = Text(index='not_analyzed') # 对应专栏地址

加上参数index='not_analyzed'是因为这2个字段不用于搜索和聚合,没必要分词,就当成数据库使用了。

也给Live添加一些属性和方法,方便最后用to_dict()生成需要的全部数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from .speaker import User, session
class Live(DocType):
@property
def id(self):
return self._id
@property
def speaker(self):
return session.query(User).get(self.speaker_id)
@property
def url(self):
return LIVE_URL.format(self.id)
class Meta:
index = 'live130'
def to_dict(self, include_extended=True):
d = super().to_dict()
if include_extended:
d.update({
'id': self._id,
'type': 'live',
'speaker': self.speaker.to_dict(),
'url': self.url
})
return d

其中speaker属性是常见的关联多个model的快捷方式,但是需要注意,竟然不要设计成A的model里面某个方法返回了B的model数据,B的model里面也返回了A的model的数据而造成只能进行方法内import。

super().to_dict()的原因是DocType内置了to_dict方法,随便提一下,而且接收include_meta参数,为True会包含index和doc_type的元数据。

把Live设计成异步的

这个是今天的重点,昨天说的「让elasticsearch_dsl支持asyncio」就是给今天做准备。换汤不换药,说白了就是在合适的地方添加async/await关键字,先看个add的:

1
2
3
4
5
6
7
8
9
10
class Live(DocType):
...
@classmethod
async def add(cls, **kwargs):
id = kwargs.pop('id', None)
if id is None:
return False
live = cls(meta={'id': int(id)}, **kwargs)
await live.save()
return live

现在我们挨个实现需求,首先是搜索接口,由于DocType包含了search方法,得换个名字了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Live(DocType):
...
async def _execute(cls, s, order_by=None):
# 可以选择字段的排序,前面加-表示desc,不加就是默认的asc
if order_by is not None:
s = s.sort(order_by)
lives = await s.execute() # 执行,要在这步之前拼好查询条件
return [live.to_dict() for live in lives]
@classmethod
def apply_weight(cls, s, start, limit):
return s.query(Q('function_score', functions=[gauss_sf, log_sf])).extra(
**{'from': start, 'size': limit})
@classmethod
async def ik_search(cls, query, status=None, start=0, limit=10):
s = cls.search()
# 多字段匹配要搜索的内容,SEARCH_FIELDS中不同字段权重不同
s = s.query('multi_match', query=query,
fields=SEARCH_FIELDS)
if status is not None: # 根据结束状态过滤
s = s.query('match', status=status)
# 搜索是带权重的,按照之前的设计做了时间衰减和归一化
s = cls.apply_weight(s, start, limit)
return await cls._execute(s)

就是根据需求,按照DSL的方式来拼。我添加了些注释,看不懂的话可以按照文章开始的链接去找找答案。

然后是发现接口,7/30天热门都是基于这个接口,只不过划定了时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Live(DocType):
...
@classmethod
async def explore(cls, from_date=None, to_date=None, order_by=None,
start=0, limit=10, topic=None):
s = cls.search()
if topic is not None:
s = s.query(Q('term', topic_names=topic))
starts_at = {}
if from_date is not None:
starts_at['from'] = from_date
if to_date is not None:
starts_at['to'] = to_date
if starts_at:
s = s.query(Q('range', starts_at=starts_at))
if order_by is None:
s = cls.apply_weight(s, start, limit)
return await cls._execute(s, order_by)
@classmethod
async def get_hot_weekly(cls):
today = date.today()
return await cls.explore(from_date=today - timedelta(days=7),
to_date=today, limit=20)
@classmethod
async def get_hot_monthly(cls):
today = date.today()
return await cls.explore(from_date=today - timedelta(days=30),
to_date=today, limit=50)

注意,explore方法如果指定了排序方案,就不会添加时间衰减和归一化的处理了。

然后是获取用户举报的全部Live的方法:

1
2
3
4
5
6
7
class Live(DocType):
...
@classmethod
async def ik_search_by_speaker_id(cls, speaker_id, order_by='-starts_at'):
s = cls.search()
s = s.query(Q('bool', should=Q('match', speaker_id=speaker_id)))
return await cls._execute(s, order_by)

可以看到_execute方法抽象后被重复利用了。

再然后是suggest接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Live(DocType):
...
@classmethod
async def ik_suggest(cls, query, size=10):
s = cls.search()
s = s.suggest('live_suggestion', query, completion={
'field': 'live_suggest', 'fuzzy': {'fuzziness': 2}, 'size': size
})
suggestions = await s.execute_suggest()
matches = suggestions.live_suggestion[0].options
ids = [match._id for match in matches]
lives = await Live.mget(ids)
return [live.to_dict() for live in lives]

其中支持2个编辑距离的模糊搜索。这个实现的比较简单,没有考虑拼音,也没有考虑搜索用户。值得一提的是DocType提供了mget这个获取多个id的接口,请善用减少网络请求,也就是给ES后端减压。

第4个获得热门话题的需求是本项目唯一用到聚合功能的地方了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from .topic import Topic
class Live(DocType):
@classmethod
async def get_hot_topics(cls, size=50):
s = cls.search()
s.aggs.bucket('topics', A('terms', field='topics', size=size))
rs = await s.execute()
buckets = rs.aggregations.topics.buckets
topic_names = [r['key'] for r in buckets]
topics = session.query(Topic).filter(Topic.name.in_(topic_names)).all()
topics = sorted(topics, key=lambda t: topic_names.index(t.name))
return [topic.to_dict() for topic in topics]

每个Live都会打话题标签,越多的live打这个话题就说明它越热门。

最后要说的是init()方法:

1
2
async def init():
await Live.init()

原来import模块的时候直接就init了,现在由于异步化了,直接init没人所以要在loop中用,比如在爬虫中:

1
2
3
4
5
6
7
8
9
10
11
12
from models.live import init as live_init
if __name__ == '__main__':
loop = asyncio.get_event_loop()
crawler = Crawler()
loop.run_until_complete(live_init())
loop.run_until_complete(crawler.crawl())
print('Finished in {:.3f} secs'.format(crawler.t1 - crawler.t0))
crawler.close()
loop.close()
es.transport.close()

理解了嘛?

好了全部接口都完成了,但是大家有木有感觉,异步编程调试起来很麻烦,我来教一个好用的方法.

调试async程序

asyncio要求把需要协程化的函数都放进一个loop,通过run_until_complete方法让它执行完成。

但是现在非常不好玩:

1
2
3
4
5
6
7
8
9
10
11
12
13
In : from models import Live
In : live = Live.get(789840559912009728)
In : live
Out: <coroutine object DocType.get at 0x10a0d1fc0>
In : live.subject
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-4-8c237874146c> in <module>()
----> 1 live.subject
AttributeError: 'coroutine' object has no attribute 'subject'

异步化的函数(方法)用起来很不直观。一开始可以写个脚本把要调试的东西放进去用(test_es.py):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
from elasticsearch_dsl.connections import connections
from models.live import Live, SEARCH_FIELDS, init as live_init
s = Live.search()
es = connections.get_connection(Live._doc_type.using)
async def print_info():
rs = await s.query('multi_match', query='python',
fields=SEARCH_FIELDS).execute()
print(rs)
loop = asyncio.get_event_loop()
loop.run_until_complete(live_init())
loop.run_until_complete(print_info())
loop.close()
es.transport.close()

这样也是可以调试的,很麻烦,对吧?

抽象一下,其实写个函数就好了:

1
2
3
4
5
6
7
import asyncio
def execute(coro):
loop = asyncio.get_event_loop()
rs = loop.run_until_complete(coro)
return rs

OK, 再用:

1
2
3
4
5
In : from models import Live, execute
In : live = Live.get(789840559912009728)
In : live = execute(live)
In : live.subject
Out: 'Python 工程师的入门和进阶'

这样就方便多了。