晒一下知乎Live团队送的新年礼物:

感谢。所以我将送上一系列知乎Live主题的文章

在我的知乎Live里面,我留下了调查问卷,其中一项是可以反馈自己感兴趣的Python相关技术。意料之内,最受大家欢迎的就是爬虫。所以最近开始写了一些和爬虫有关的内容。不过虽然现在还是1月,但是我发誓这是本年最后一次写爬虫文章了(要写吐了)。

模型设计

做知乎Live的全文搜索,首先要抓取到全部Live信息。我尝试了下,只要是登录状态就可以使用知乎提供的API获取。

接口分为2种:

  1. https://api.zhihu.com/lives/ongoing?purchasable=0&limit=10&offset=10 (未结束)
  2. https://api.zhihu.com/lives/ended?purchasable=0&limit=10&offset=10 (已结束)

每个Live包含非常多的字段,还包含了一些预留字段。我们简单分析下并实现对应的模型。

主讲人模型

主讲人字段如下:

speaker: {
    member: {
        name: "Flood Sung",
        url: "https://api.zhihu.com/people/23deec836a24f295500a6d740011359c",
        type: "people",
        user_type: "people",
        headline: "Deep Learning/Reinforcement Learning/Robotics/",
        avatar_url: "https://pic3.zhimg.com/73a71f47d66e280735a6c786131bdfe2_s.jpg",
        gender: 1,
        url_token: "flood-sung",
        id: "23deec836a24f295500a6d740011359c"
    },
    bio: "Deep Learning/Reinforcement Learning/Robotics/",
    description: "我是 Flood Sung ..."
}

主讲人部分我单独存储,不直接放进elasticsearch的原因有2个:

  1. 全文搜索和主讲人有关的字段只有主讲人的名字和描述,其他内容没有意义
  2. 主讲人可能会有多个Live,重复存储浪费空间

所以我还是按着惯例,存在主流数据库中,由于目前Live总体才2k多,数据量太小,且更新机会不多(爬虫抓取是离线的)直接放进了SQLite就好了。

为了未来在数据量变大需要改变存储的数据库时可以灵活切换,我使用提供对象关系映射(ORM)的SQLAlchemy来写模型(models/speaker.py):

from sqlalchemy import Column, String, Integer, create_engine, SmallInteger
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

DB_URI = 'sqlite:///user.db'
Base = declarative_base()
engine = create_engine(DB_URI)
Base.metadata.bind = engine
Session = sessionmaker(bind=engine)
session = Session()


class User(Base):
    __tablename__ = 'live_user'

    id = Column(Integer, unique=True, primary_key=True, autoincrement=True)
    speaker_id = Column(String(40), index=True, unique=True)
    name = Column(String(40), index=True, nullable=False)
    gender = Column(SmallInteger, default=2)
    headline = Column(String(200))
    avatar_url = Column(String(100), nullable=False)
    bio = Column(String(200))
    description = Column(String())

    @classmethod
    def add(cls, **kwargs):
        speaker_id = kwargs.get('speaker_id', None)
        if id is not None:
            r = session.query(cls).filter_by(speaker_id=speaker_id).first()
            if r:
                return r
        try:
            r = cls(**kwargs)
            session.add(r)
            session.commit()
        except:
            session.rollback()
            raise
        else:
            return r

Base.metadata.create_all()

speaker_id就是上面API中的speaker['member']['id'],但是由于它是一个字符串,和常见的主键使用整数不一样(虽然也可以),所以多加一个字段存储。另外额外的写了一个add方法,相当于创建之前通过speaker_id先检查下库中是否存在对应条目。

Live模型

Live字段比较多,为了未来字段格式改变造成文本不可用,得列一下:

{
    "liked": false,
    "conv_id": "58747b778d6d81006c40cee7",
    "topics": [
        {
            "url": "https://api.zhihu.com/topics/20038840",
            "avatar_url": "https://pic3.zhimg.com/50/d9e63efc57a9f07378ae9d5416ecf85a_s.png",
            "type": "topic",
            "id": "20038840",
            "name": "阿尔法围棋(AlphaGo)"
        }
    ],
    "seats": {
        "taken": 286,
        "max": 500
    },
    "duration": 5400,
    "id": "802155571712253952",
    "subject": "从 AlphaGo 看人工智能前沿技术",
    "feedback_score": 4.5,
    "fee": {
        "amount": 1900,
        "unit": "RMB"
    },
    "purchasable": true,
    "has_feedback": false,
    "note": "版权声明:...",
    "source": "admin",
    "cospeakers": [ ],
    "speaker": { },
    "income": null,
    "role": "visitor",
    "in_promotion": false,
    "badge": {
        "avatar_url": "https://pic1.zhimg.com/8e3b9024d62aa293d8ba7235701a9a08_r.png",
        "id": 0,
        "name": "普通票"
    },
    "status": "ended",
    "ends_in": 0,
    "description": "我是 Flood Sung ...",
    "speaker_message_count": 127,
    "tags": [
        {
            "score": 0,
            "name": "互联网",
            "short_name": "互联网",
            "available_num": 191,
            "created_at": 1469691129,
            "id": 105,
            "live_num": 195
        }
    ],
    "is_muted": false,
    "liked_num": 547,
    "alert": "从看到听...",
    "can_speak": false,
    "artwork": "",
    "is_public": 1,
    "ends_at": 1484920522,
    "outline": "* AlphaGo 为什么能在围棋上取得如此重大的突破?...",
    "is_anonymous": false,
    "created_at": 1484028791,
    "related_members": [ ],
    "product_list": [ ],
    "starts_at": 1484913600,
    "is_admin": false
}

这些字段通过名字能比较清晰的了解用途。接着我们考虑用什么elasticsearch的Python客户端。官方提供了elasticsearch-py 这个低级别客户端。

最开始我使用了elasticsearch-py,所谓低级别,就是各种操作未做或者少做封装,比如搜索用到的参数就要手动拼成一个字典。相信如果你之前了解或者用过elasticsearch,就会知道它的搜索参数多的令人发指。如果业务需求比较简单,elasticsearch-py还是可满足的。随着需求变多变复杂,你会发现拼这样一个多键值、多层嵌套字典的逻辑变得越来越不可维护,且写错了不容易发现,如果你对业务和elasticsearch不熟悉,非常容易掉坑儿。

那有没有什么更好写搜索和模型的方式嘛?

官方提供了基于elasticsearch-py的高级别客户端elasticsearch-dsl-py。DSL是领域专用语言(Domain Specific Language)的简称,也就是专门针对某一特定问题的计算机语言,特点是「求专不求全」。elasticsearch-dsl-py就是针对elasticsearch的特定语言。

它允许我们用一种非常可维护的方法来组织字典:

In : from elasticsearch_dsl.query import Q

In : Q('multi_match', subject='python').to_dict()
Out: {'multi_match': {'subject': 'python'}}

允许用一种函数式的方法来做查询:

In : from elasticsearch import Elasticsearch
In : from elasticsearch_dsl import Search, Q

In : s = Search(using=client, index='live')
In : s = s.query('match', subject='python').query(~Q('match', description='量化'))
In : s.execute()
Out: <Response: [<Hit(live/live/789840559912009728): {'subject': 'Python 工程师的入门和进阶', 'feedback_score': 4.5, 'stat...}>]>

上述例子表示从live这个索引(类似数据库中的Database)中找到subject字典包含python,但是description字段不包含量化的Live。

当然这个DSL也支持用类代表一个doc_type(类似数据库中的Table),实现ORM的效果。我们就用它来写Live模型:

from elasticsearch_dsl import DocType, Date, Integer, Text, Float, Boolean
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl.query import SF, Q

from config import SEARCH_FIELDS
from .speaker import User, session

connections.create_connection(hosts=['localhost'])


class Live(DocType):
    id = Integer()
    speaker_id = Integer()
    feedback_score = Float() # 评分
    topic_names = Text(analyzer='ik_max_word')  # 话题标签名字
    seats_taken = Integer()  # 参与人数
    subject = Text(analyzer='ik_max_word')  # 标题
    amount = Float()  # 价格(RMB)
    description = Text(analyzer='ik_max_word')
    status = Boolean()  # public(True)/ended(False)
    starts_at = Date()
    outline = Text(analyzer='ik_max_word')  # Live内容
    speaker_message_count = Integer()
    tag_names = Text(analyzer='ik_max_word')
    liked_num = Integer()

    class Meta:
        index = 'live'

    @classmethod
    def add(cls, **kwargs):
        id = kwargs.pop('id', None)
        if id is None:
            return False
        live = cls(meta={'id': id}, **kwargs)
        live.save()
        return live

爬虫

在之前我已经分享了一个功能比较齐备的基于aiphttp的爬虫实现,还分享过如何使用API登录知乎并获得token。今天用之前的积累,实现这个爬虫。由于之前已经展示用完整代码,今天只介绍如何让它们「串」起来。

首先是初始化部分:

...
from models import User, Live, session
from client import ZhihuClient
from utils import flatten_live_dict
from config import SPEAKER_KEYS, LIVE_KEYS

LIVE_API_URL = 'https://api.zhihu.com/lives/{type}?purchasable=0&limit=10&offset={offset}'  # noqa
LIVE_TYPE = frozenset(['ongoing', 'ended'])


class Crawler:
    def __init__(self, max_redirect=10, max_tries=4,
                 max_tasks=10, *, loop=None):
        ...
        for t in LIVE_TYPE:
            for offset in range(self.max_tasks):
                self.add_url(LIVE_API_URL.format(type=t, offset=offset * 10))

        client = ZhihuClient()
        self.headers = {}
        client.auth(self)

初始化的时候通过add_url添加2种API、共2倍max_tasks个url。由于aiohttp的auth参数不支持原来requests的ZhihuOAuth,所以添加一个值为空字典的self.headers方便生成包含知乎API请求需要的headers。

知乎API设计的不错,返回的结果中包含了上一页和下一页的地址和是否结束:

paging: {
    is_end: false,
    next: "https://api.zhihu.com/lives/ended?purchasable=0&limit=10&offset=20",
    previous: "https://api.zhihu.com/lives/ended?purchasable=0&limit=10&offset=0"
}

所以parse_link的结果也就是只是一个url了。我把存储爬取下来的Live和主讲人信息的方法也放进了这个方法:

async def parse_link(self, response):
        rs = await response.json()

        if response.status == 200:
            for live in rs['data']:
                speaker = live.pop('speaker')
                speaker_id = speaker['member']['id']
                user = User.add(speaker_id=speaker_id,
                                **flatten_live_dict(speaker, SPEAKER_KEYS))
                live_dict = flatten_live_dict(live, LIVE_KEYS)
                if live_dict['id'] == LAST_INSERT_ID:
                    self._stopped = True
                    return
                live_dict['speaker_id'] = user.id
                live_dict['topic_names'] = ' '.join(
                    [t['name'] for t in live_dict.pop('topics')])
                live_dict['seats_taken'] = live_dict.pop('seats')['taken']
                live_dict['amount'] = live_dict.pop('fee')['amount'] / 100
                live_dict['status'] = live_dict['status'] == 'public'
                live_dict['tag_names'] = ' '.join(
                    set(sum([(t['name'], t['short_name'])
                         for t in live_dict.pop('tags')], ())))
                live_dict['starts_at'] = datetime.fromtimestamp(
                    live_dict['starts_at'])
                Live.add(**live_dict)

            paging = rs['paging']
            if not paging['is_end']:
                next_url = paging['next']
                return paging['next']
        else:
            print('HTTP status_code is {}'.format(response.status))

其中flatten_live_dict是从嵌套的字典里面把需要的键值抽出来:

def flatten_live_dict(d, keys=[]):
    def items():
        for key, value in d.items():
            if key in keys:
                yield key, value
            elif isinstance(value, dict):
                for subkey, subvalue in flatten_live_dict(value, keys).items():
                    if subkey != 'id' and subkey in keys:
                        yield subkey, subvalue

    return dict(items())

这个函数对id做了特殊处理,是因为在flatten过程中,有多个id会被无情替换,所以额外处理了。

其中参数keys分2种:

LIVE_KEYS = ['id', 'feedback_score', 'seats', 'subject', 'fee',
             'description', 'status', 'starts_at', 'outline',
             'speaker_message_count', 'liked_num', 'tags', 'topics']
SEARCH_FIELDS = ['subject', 'description', 'outline', 'tag_names',
                 'topic_names']

这样就完成了一个爬虫。我没有添加爬虫的断点续爬(发现之前爬过停止抓取)功能。这是因为Live一般在结束后都还是可以有人参与,去评价。反正总量也就是一分钟抓取完而已。如果想要添加这种减少重复抓取的功能,最简单的方式如下:

...
search = Live.search()
if search.count():
    LAST_INSERT_ID = search.sort('starts_at').execute()[0]._id
else:
    LAST_INSERT_ID = 0


class Crawler:
    def __init__(self, max_redirect=10, max_tries=4,
                 max_tasks=10, *, loop=None): 
        ...
        self._stopped = False

    async def parse_link(self, response):
        rs = await response.json()

        if response.status == 200:
            if live_dict['id'] == LAST_INSERT_ID:
                self._stopped = True
                return 
            ...

   def add_url(self, url, max_redirect=None):
        if max_redirect is None:
            max_redirect = self.max_redirect
        if url not in self.seen_urls and not self._stopped:
            self.seen_urls.add(url)
            self.q.put_nowait((url, max_redirect))