知乎live全文搜索之构建基于asyncio+sanic的RESTful API服务
/ / / 阅读数:5996今天的文章图片就是这个知乎 Live 全文搜索的效果了。如果在 wifi 情况下或者土豪不介意流量的同学可以直接 感受实际使用的动态效果 。
在我的《Python Web 开发实战》一书中,比较少的介绍到豆瓣自己造的轮子。有读者喷「不够实战」。我显然承认,也很苦闷 ,今天顺便来吐个槽。我刚工作的时候特别愿意混各种技术会议和活动。经常有一些专家在上面讲:我们自己实现了一个 XXX,它有如下多的特性,现在支持了多少个产品线的多少个应用,每天的数据量 YY PB,流量 ZZ...
通篇在讲架构,摆几张高大上的图,甚至能说几个大家不了解的新的玩法都很少,还不断的问坐在下面的领导或者法务:额,这个我能说嘛;那个我能分享么?最最重要的是,他们讲的这些东西大多不是开源的.... 也基本没有一个可以拿得出手的论文,甚至说白了,如果你正好专注这一部分,会发现它也是根据了 FLAG 公司的论文在造轮子罢了,说不定造的还不如你。
听过之后,也没有收获,都是「别人家」的,甚至有种炫耀的感觉而已,至于有什么苦可能只有他们自己知道吧。我就越来越不愿意参加这种会议了。
写书或者博客也是这样。一切东西脱离了公司能提供的基础设施和环境都是空谈,但是这些铺垫说起来就太大了,先不讨论公司有没有授权你在外面说,就是没开源这点就不好弄。说的人都是在虚化的讲一大坨的东西,最多来几个截图之类的(应该还打了马赛克)。等这一大坨东西说的让大家明白了,一本书的厚度肯定不够。但是这些内容呢,只是你为了写书的某一 (几) 章做铺垫而已,有些内容太专太偏,读者大部分场景下是用不到的,好吧,又得骂娘说你这本书不实用...
今天我将基于我过往的实践,以及最近学习 asyncio 和 ES 知识完成一个小程序的 API 服务。看这篇文章前推荐阅读相关的如下文章:
- 知乎 Live 全文搜索之模型设计和爬虫实现
- 知乎 Live 全文搜索之模型接口
- 使用 Python 进行并发编程 - 我为什么不喜欢 Gevent
- 使用 Python 进行并发编程 - asyncio 篇 (一)
- 使用 Python 进行并发编程 - asyncio 篇 (二)
- 使用 Python 进行并发编程 - asyncio 篇 (三)
- 知乎 Live 全文搜索之让 elasticsearch_dsl 支持 asyncio
技术选型
- Sanic ,基于 Python 3.5 + 的异步 web 服务器,和 Flask 一样使用装饰器作为路由,支持 Blueprint。效果确实非常快。
- uvloop ,Sanic 默认使用 uvloop,这个实现基于 libuv,比 asyncio 默认的 loop 的块很多。
- marshmallow ,一个轻量级的转化复杂对象成为 Python 自带数据类型的库,为什么用它未来会详细介绍。
使用 Schema
了解过关系型数据库如 MySQL 的同学会比较熟悉 Schema 这个词,它是对数据库的结构描述。Schema 定义了表与表和表与字段之间的关系。在使用关系型数据库之前第一件事要先定 Schema,创建表 (库) 再去操作。
为什么我们在实现 RESTful API 的时候要考虑使用 Schema 呢?
- 首先 API 服务是给外部用的,比如移动端 (安卓、IOS 等),前端 (用 AJAX) 等。那么大家一开始就要协商好那些字段,以及字段的类型。因为你不关系,他们都是关心的,如果设计有问题会由于没有正确处理而造成移动端闪退等严重问题。
- 你需要对 API 返回的数据进行良好的管理和验证。
marshmallow 把一组数据映射成一个类:
from marshmallow import Schema, fields class UserSchema(Schema): id = fields.Integer() url = fields.Str() name = fields.Str() ... |
这样既让不同编程语言的开发者一目了然,也能检验你提供的数据是不是按照这个定好的结构返回的。
同一组数据可以定义多种 Schema
思考一下,在搜索页面,每一项提供的空间有限,你无法把 Live 的全部信息(比如「描述」这种很长的内容的字段)都展示出来,也就是就算都返回了,其实只用了一部分字段,这造成了更多的网络延时和带宽消耗。但是在 Live 详情页理论上就可以展示全部的字段的内容了。
再假设下,如果是返回一部分,还是返回全部字段在后端做,就是不同的方法返回时对 to_dict 方法对一对 if/elif/else 的处理,其实看起来很乱。我是这样用的:
class UserSchema(Schema): id = fields.Integer() url = fields.Str() name = fields.Str() ... class UserFullSchema(UserSchema): lives_url = fields.Str() speaker_id = fields.Str() ... |
也就是定义了多种 User 的 schema,按需选择。但是后端统一使用 to_dict 方法返回全部数据,在视图渲染的时候进行筛选。这样需要用一种好的表达方式:
from views.utils import marshal_with @bp.route('/search') @marshal_with([LiveSchema, UserSchema]) async def search(request): ... @bp.route('/suggest') @marshal_with(LiveSchema) async def suggest(request): ... @bp.route('/user/<user_id>') @marshal_with([LiveFullSchema, UserFullSchema]) async def user(request, user_id): ... |
我们先不考虑视图内的逻辑,简单的理解成他们是单个 live 的 to_dict 结果或者多个 live 的 to_dict 结果的列表
通过神奇的 marshal_with 装饰器传入你希望返回符合那种 Schema 的数据。
现在揭晓一下:
def marshal(data, fields): schemas = [field() for field in fields] if isinstance(data, (list, tuple)): return [marshal(d, fields) for d in data] type = data.get('type') for schema in schemas: if type in schema.__class__.__name__.lower(): result, errors = schema.dump(data) if errors: for item in errors.items(): print('{}: {}'.format(*item)) return result class marshal_with(object): def __init__(self, fields): if not isinstance(fields, list): fields = [fields] self.fields = fields def __call__(self, f): @wraps(f) async def wrapper(*args, **kwargs): resp = await f(*args, **kwargs) return marshal(resp, self.fields) return wrapper |
这个是 async 版本的,大家可以自己发散成 Python 2 的普通版。记得之前我们在 model 里面给每种数据的 to_dict 方法里面加了个{'type': 'live'}
这种键值么,除了在小程序里面分辨数据的类型,在这里也是用来匹配那种 schema 的。举个例子,假如是一个 user 类型的数据。
在@marshal_with([LiveFullSchema, UserFullSchema])
的装饰下,由于 UserFullSchema 类名包含了 live 所以符合了。这比较黑科技一点..
深入使用 marshmallow
marshmallow 除了生成一个可读性很好的类和验证该字段是不是类型符合以外,还支持序列化和反序列化的处理。有什么意义呢。假如如下的 schema:
class LiveSchema(Schema): starts_at = fields.Date() |
注意我们 model 存的 starts_at 是一个 datetime 类型的对象,无法被 json 序列化,所以返回之前应该先转化成字符串:
class LiveSchema(Schema): starts_at = fields.Method('get_start_time') def get_start_time(self, obj): return int(obj['starts_at'].strftime('%s')) |
再举个例子:
class UserSchema(Schema): bio = fields.Str() headline = fields.Str() description = fields.Str() |
bio/headline/description 这三个字段内容都有可能比较长,需要做不同的截取:在详情页显示全部,在搜索页之前显示前 2 行.. 我们需要一个 truncate 函数:
WIDTH = 45 def truncate_utf8(str, width=WIDTH): return str[:width] + '...' if len(str) > width else str |
假如使用 fields.Method 你就要写三方法,因为它指定的方法只有self, obj
2 个参数,而且 Schema 是不能继承的。这么办呢?有我的书中介绍的 partialmethod 可完美实现:
from functools import partialmethod class Item(object): def truncate(self, attr, obj): if attr not in obj: return '' return truncate_utf8(obj[attr], WIDTH) class UserSchema(Schema): bio = fields.Method('truncate_bio') headline = fields.Method('truncate_headline') description = fields.Method('truncate_description') truncate_headline = partialmethod(Item.truncate, 'headline') truncate_bio = partialmethod(Item.truncate, 'bio') truncate_description = partialmethod(Item.truncate, 'description') |
通过 partialmethod + 非继承至 Schema 的类就可以实现继承和额外参数了。
对 Sanic 定制
我们都知道,当你有独特的需求而框架不满足的时候,就要对其进行二次开发或者封装。我一般倾向基于框架提供的灵活性去封装。由于在多个 API 上都有分页的需要,参数中会出现 start/limit(当然你可以更喜欢其他的词汇)。如果不定制,那么在每个视图里面都要加一句:
@bp.route('/search')
@marshal_with([LiveSchema, UserSchema])
async def search(request):
start = request.args.get('start', 0)
limit = request.args.get('limit', 10)
...
这 2 句就是个累赘。怎么做呢? 利用 sanic 提供的中间件就好了:
@app.middleware('request') async def halt_request(request): request.start = request.args.get('start', 0) request.limit = request.args.get('limit', 10) |
但是这还不够,因为 sanic 在创建 Request 的时候基于内存的考虑使用了__slots__,所以我们要重新写 on_headers_complete 方法:
from sanic.server import HttpProtocol, CIMultiDict from sanic.request import Request as _Request class Request(_Request): __slots__ = ( 'url', 'headers', 'version', 'method', '_cookies', 'query_string', 'body', 'start', 'limit', 'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files', ) class JSONHttpProtocol(HttpProtocol): def on_headers_complete(self): remote_addr = self.transport.get_extra_info('peername') if remote_addr: self.headers.append(('Remote-Addr', '%s:%s' % remote_addr)) self.request = Request( url_bytes=self.url, headers=CIMultiDict(self.headers), version=self.parser.get_http_version(), method=self.parser.get_method().decode() ) if __name__ == '__main__': app.run(host='0.0.0.0', port=8300, protocol=JSONHttpProtocol, workers=4, debug=True) |
这样在视图中就可以直接使用 request.start 和 request.limit 了。
其次看官方用法,都是在视图中控制返回的内容的类型,比如:
from sanic.response import json @app.route('/') async def test(request): return json({"hello": "world"}) |
我也希望封装全部的返回结果的格式为:
{'rs': data} |
PS: 当然生产环境中应该还有一个 error 字段甚至 error_code 字段标识如果出错的信息和类型等字段,我这里作为演示就保留了一个 rs 字段
这个 data 就是实际的视图返回的结果,但是在响应的时候已经封好了。可以继续重写 write_response 方法:
class JSONHttpProtocol(HttpProtocol): def write_response(self, response): if isinstance(response, str): response = text(response) elif isinstance(response, (list, dict)): response = {'rs': response} if isinstance(response, dict): response = json(response) return super().write_response(response) |
这样返回的结果就很统一了。
至此,一个异步的、风格明确的、功能满足需要的 API 服务就完成了。