晒一下知乎Live团队送的新年礼物:

感谢。所以我将送上一系列知乎Live主题的文章

在我的知乎Live里面,我留下了调查问卷,其中一项是可以反馈自己感兴趣的Python相关技术。意料之内,最受大家欢迎的就是爬虫。所以最近开始写了一些和爬虫有关的内容。不过虽然现在还是1月,但是我发誓这是本年最后一次写爬虫文章了(要写吐了)。

模型设计

做知乎Live的全文搜索,首先要抓取到全部Live信息。我尝试了下,只要是登录状态就可以使用知乎提供的API获取。

接口分为2种:

  1. https://api.zhihu.com/lives/ongoing?purchasable=0&limit=10&offset=10 (未结束)
  2. https://api.zhihu.com/lives/ended?purchasable=0&limit=10&offset=10 (已结束)

每个Live包含非常多的字段,还包含了一些预留字段。我们简单分析下并实现对应的模型。

主讲人模型

主讲人字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
speaker: {
member: {
name: "Flood Sung",
url: "https://api.zhihu.com/people/23deec836a24f295500a6d740011359c",
type: "people",
user_type: "people",
headline: "Deep Learning/Reinforcement Learning/Robotics/",
avatar_url: "https://pic3.zhimg.com/73a71f47d66e280735a6c786131bdfe2_s.jpg",
gender: 1,
url_token: "flood-sung",
id: "23deec836a24f295500a6d740011359c"
},
bio: "Deep Learning/Reinforcement Learning/Robotics/",
description: "我是 Flood Sung ..."
}

主讲人部分我单独存储,不直接放进elasticsearch的原因有2个:

  1. 全文搜索和主讲人有关的字段只有主讲人的名字和描述,其他内容没有意义
  2. 主讲人可能会有多个Live,重复存储浪费空间

所以我还是按着惯例,存在主流数据库中,由于目前Live总体才2k多,数据量太小,且更新机会不多(爬虫抓取是离线的)直接放进了SQLite就好了。

为了未来在数据量变大需要改变存储的数据库时可以灵活切换,我使用提供对象关系映射(ORM)的SQLAlchemy来写模型(models/speaker.py):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from sqlalchemy import Column, String, Integer, create_engine, SmallInteger
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
DB_URI = 'sqlite:///user.db'
Base = declarative_base()
engine = create_engine(DB_URI)
Base.metadata.bind = engine
Session = sessionmaker(bind=engine)
session = Session()
class User(Base):
__tablename__ = 'live_user'
id = Column(Integer, unique=True, primary_key=True, autoincrement=True)
speaker_id = Column(String(40), index=True, unique=True)
name = Column(String(40), index=True, nullable=False)
gender = Column(SmallInteger, default=2)
headline = Column(String(200))
avatar_url = Column(String(100), nullable=False)
bio = Column(String(200))
description = Column(String())
@classmethod
def add(cls, **kwargs):
speaker_id = kwargs.get('speaker_id', None)
if id is not None:
r = session.query(cls).filter_by(speaker_id=speaker_id).first()
if r:
return r
try:
r = cls(**kwargs)
session.add(r)
session.commit()
except:
session.rollback()
raise
else:
return r
Base.metadata.create_all()

speaker_id就是上面API中的speaker[‘member’][‘id’],但是由于它是一个字符串,和常见的主键使用整数不一样(虽然也可以),所以多加一个字段存储。另外额外的写了一个add方法,相当于创建之前通过speaker_id先检查下库中是否存在对应条目。

Live模型

Live字段比较多,为了未来字段格式改变造成文本不可用,得列一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
{
"liked": false,
"conv_id": "58747b778d6d81006c40cee7",
"topics": [
{
"url": "https://api.zhihu.com/topics/20038840",
"avatar_url": "https://pic3.zhimg.com/50/d9e63efc57a9f07378ae9d5416ecf85a_s.png",
"type": "topic",
"id": "20038840",
"name": "阿尔法围棋(AlphaGo)"
}
],
"seats": {
"taken": 286,
"max": 500
},
"duration": 5400,
"id": "802155571712253952",
"subject": "从 AlphaGo 看人工智能前沿技术",
"feedback_score": 4.5,
"fee": {
"amount": 1900,
"unit": "RMB"
},
"purchasable": true,
"has_feedback": false,
"note": "版权声明:...",
"source": "admin",
"cospeakers": [ ],
"speaker": { },
"income": null,
"role": "visitor",
"in_promotion": false,
"badge": {
"avatar_url": "https://pic1.zhimg.com/8e3b9024d62aa293d8ba7235701a9a08_r.png",
"id": 0,
"name": "普通票"
},
"status": "ended",
"ends_in": 0,
"description": "我是 Flood Sung ...",
"speaker_message_count": 127,
"tags": [
{
"score": 0,
"name": "互联网",
"short_name": "互联网",
"available_num": 191,
"created_at": 1469691129,
"id": 105,
"live_num": 195
}
],
"is_muted": false,
"liked_num": 547,
"alert": "从看到听...",
"can_speak": false,
"artwork": "",
"is_public": 1,
"ends_at": 1484920522,
"outline": "* AlphaGo 为什么能在围棋上取得如此重大的突破?...",
"is_anonymous": false,
"created_at": 1484028791,
"related_members": [ ],
"product_list": [ ],
"starts_at": 1484913600,
"is_admin": false
}

这些字段通过名字能比较清晰的了解用途。接着我们考虑用什么elasticsearch的Python客户端。官方提供了elasticsearch-py 这个低级别客户端。

最开始我使用了elasticsearch-py,所谓低级别,就是各种操作未做或者少做封装,比如搜索用到的参数就要手动拼成一个字典。相信如果你之前了解或者用过elasticsearch,就会知道它的搜索参数多的令人发指。如果业务需求比较简单,elasticsearch-py还是可满足的。随着需求变多变复杂,你会发现拼这样一个多键值、多层嵌套字典的逻辑变得越来越不可维护,且写错了不容易发现,如果你对业务和elasticsearch不熟悉,非常容易掉坑儿。

那有没有什么更好写搜索和模型的方式嘛?

官方提供了基于elasticsearch-py的高级别客户端elasticsearch-dsl-py。DSL是领域专用语言(Domain Specific Language)的简称,也就是专门针对某一特定问题的计算机语言,特点是「求专不求全」。elasticsearch-dsl-py就是针对elasticsearch的特定语言。

它允许我们用一种非常可维护的方法来组织字典:

1
2
3
4
In : from elasticsearch_dsl.query import Q
In : Q('multi_match', subject='python').to_dict()
Out: {'multi_match': {'subject': 'python'}}

允许用一种函数式的方法来做查询:

1
2
3
4
5
6
7
In : from elasticsearch import Elasticsearch
In : from elasticsearch_dsl import Search, Q
In : s = Search(using=client, index='live')
In : s = s.query('match', subject='python').query(~Q('match', description='量化'))
In : s.execute()
Out: <Response: [<Hit(live/live/789840559912009728): {'subject': 'Python 工程师的入门和进阶', 'feedback_score': 4.5, 'stat...}>]>

上述例子表示从live这个索引(类似数据库中的Database)中找到subject字典包含python,但是description字段不包含量化的Live。

当然这个DSL也支持用类代表一个doc_type(类似数据库中的Table),实现ORM的效果。我们就用它来写Live模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from elasticsearch_dsl import DocType, Date, Integer, Text, Float, Boolean
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl.query import SF, Q
from config import SEARCH_FIELDS
from .speaker import User, session
connections.create_connection(hosts=['localhost'])
class Live(DocType):
id = Integer()
speaker_id = Integer()
feedback_score = Float() # 评分
topic_names = Text(analyzer='ik_max_word') # 话题标签名字
seats_taken = Integer() # 参与人数
subject = Text(analyzer='ik_max_word') # 标题
amount = Float() # 价格(RMB)
description = Text(analyzer='ik_max_word')
status = Boolean() # public(True)/ended(False)
starts_at = Date()
outline = Text(analyzer='ik_max_word') # Live内容
speaker_message_count = Integer()
tag_names = Text(analyzer='ik_max_word')
liked_num = Integer()
class Meta:
index = 'live'
@classmethod
def add(cls, **kwargs):
id = kwargs.pop('id', None)
if id is None:
return False
live = cls(meta={'id': id}, **kwargs)
live.save()
return live

爬虫

在之前我已经分享了一个功能比较齐备的基于aiphttp的爬虫实现,还分享过如何使用API登录知乎并获得token。今天用之前的积累,实现这个爬虫。由于之前已经展示用完整代码,今天只介绍如何让它们「串」起来。

首先是初始化部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
from models import User, Live, session
from client import ZhihuClient
from utils import flatten_live_dict
from config import SPEAKER_KEYS, LIVE_KEYS
LIVE_API_URL = 'https://api.zhihu.com/lives/{type}?purchasable=0&limit=10&offset={offset}' # noqa
LIVE_TYPE = frozenset(['ongoing', 'ended'])
class Crawler:
def __init__(self, max_redirect=10, max_tries=4,
max_tasks=10, *, loop=None):
...
for t in LIVE_TYPE:
for offset in range(self.max_tasks):
self.add_url(LIVE_API_URL.format(type=t, offset=offset * 10))
client = ZhihuClient()
self.headers = {}
client.auth(self)

初始化的时候通过add_url添加2种API、共2倍max_tasks个url。由于aiohttp的auth参数不支持原来requests的ZhihuOAuth,所以添加一个值为空字典的self.headers方便生成包含知乎API请求需要的headers。

知乎API设计的不错,返回的结果中包含了上一页和下一页的地址和是否结束:

1
2
3
4
5
paging: {
is_end: false,
next: "https://api.zhihu.com/lives/ended?purchasable=0&limit=10&offset=20",
previous: "https://api.zhihu.com/lives/ended?purchasable=0&limit=10&offset=0"
}

所以parse_link的结果也就是只是一个url了。我把存储爬取下来的Live和主讲人信息的方法也放进了这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
async def parse_link(self, response):
rs = await response.json()
if response.status == 200:
for live in rs['data']:
speaker = live.pop('speaker')
speaker_id = speaker['member']['id']
user = User.add(speaker_id=speaker_id,
**flatten_live_dict(speaker, SPEAKER_KEYS))
live_dict = flatten_live_dict(live, LIVE_KEYS)
if live_dict['id'] == LAST_INSERT_ID:
self._stopped = True
return
live_dict['speaker_id'] = user.id
live_dict['topic_names'] = ' '.join(
[t['name'] for t in live_dict.pop('topics')])
live_dict['seats_taken'] = live_dict.pop('seats')['taken']
live_dict['amount'] = live_dict.pop('fee')['amount'] / 100
live_dict['status'] = live_dict['status'] == 'public'
live_dict['tag_names'] = ' '.join(
set(sum([(t['name'], t['short_name'])
for t in live_dict.pop('tags')], ())))
live_dict['starts_at'] = datetime.fromtimestamp(
live_dict['starts_at'])
Live.add(**live_dict)
paging = rs['paging']
if not paging['is_end']:
next_url = paging['next']
return paging['next']
else:
print('HTTP status_code is {}'.format(response.status))

其中flatten_live_dict是从嵌套的字典里面把需要的键值抽出来:

1
2
3
4
5
6
7
8
9
10
11
def flatten_live_dict(d, keys=[]):
def items():
for key, value in d.items():
if key in keys:
yield key, value
elif isinstance(value, dict):
for subkey, subvalue in flatten_live_dict(value, keys).items():
if subkey != 'id' and subkey in keys:
yield subkey, subvalue
return dict(items())

这个函数对id做了特殊处理,是因为在flatten过程中,有多个id会被无情替换,所以额外处理了。

其中参数keys分2种:

1
2
3
4
5
LIVE_KEYS = ['id', 'feedback_score', 'seats', 'subject', 'fee',
'description', 'status', 'starts_at', 'outline',
'speaker_message_count', 'liked_num', 'tags', 'topics']
SEARCH_FIELDS = ['subject', 'description', 'outline', 'tag_names',
'topic_names']

这样就完成了一个爬虫。我没有添加爬虫的断点续爬(发现之前爬过停止抓取)功能。这是因为Live一般在结束后都还是可以有人参与,去评价。反正总量也就是一分钟抓取完而已。如果想要添加这种减少重复抓取的功能,最简单的方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...
search = Live.search()
if search.count():
LAST_INSERT_ID = search.sort('starts_at').execute()[0]._id
else:
LAST_INSERT_ID = 0
class Crawler:
def __init__(self, max_redirect=10, max_tries=4,
max_tasks=10, *, loop=None):
...
self._stopped = False
async def parse_link(self, response):
rs = await response.json()
if response.status == 200:
if live_dict['id'] == LAST_INSERT_ID:
self._stopped = True
return
...
def add_url(self, url, max_redirect=None):
if max_redirect is None:
max_redirect = self.max_redirect
if url not in self.seen_urls and not self._stopped:
self.seen_urls.add(url)
self.q.put_nowait((url, max_redirect))