今天的文章图片就是这个知乎Live全文搜索的效果了。如果在wifi情况下或者土豪不介意流量的同学可以直接感受实际使用的动态效果

在我的《Python Web开发实战》一书中,比较少的介绍到豆瓣自己造的轮子。有读者喷「不够实战」。我显然承认,也很苦闷 ,今天顺便来吐个槽。我刚工作的时候特别愿意混各种技术会议和活动。经常有一些专家在上面讲:我们自己实现了一个XXX,它有如下多的特性,现在支持了多少个产品线的多少个应用,每天的数据量YY PB,流量ZZ…

通篇在讲架构,摆几张高大上的图,甚至能说几个大家不了解的新的玩法都很少,还不断的问坐在下面的领导或者法务:额,这个我能说嘛;那个我能分享么?最最重要的是,他们讲的这些东西大多不是开源的…. 也基本没有一个可以拿得出手的论文,甚至说白了,如果你正好专注这一部分,会发现它也是根据了FLAG公司的论文在造轮子罢了,说不定造的还不如你。

听过之后,也没有收获,都是「别人家」的,甚至有种炫耀的感觉而已,至于有什么苦可能只有他们自己知道吧。我就越来越不愿意参加这种会议了。

写书或者博客也是这样。一切东西脱离了公司能提供的基础设施和环境都是空谈,但是这些铺垫说起来就太大了,先不讨论公司有没有授权你在外面说,就是没开源这点就不好弄。说的人都是在虚化的讲一大坨的东西,最多来几个截图之类的(应该还打了马赛克)。等这一大坨东西说的让大家明白了,一本书的厚度肯定不够。但是这些内容呢,只是你为了写书的某一(几)章做铺垫而已,有些内容太专太偏,读者大部分场景下是用不到的,好吧,又得骂娘说你这本书不实用…

今天我将基于我过往的实践,以及最近学习asyncio和ES知识完成一个小程序的API服务。看这篇文章前推荐阅读相关的如下文章:

  1. 知乎Live全文搜索之模型设计和爬虫实现
  2. 知乎Live全文搜索之模型接口
  3. 使用Python进行并发编程-我为什么不喜欢Gevent
  4. 使用Python进行并发编程-asyncio篇(一)
  5. 使用Python进行并发编程-asyncio篇(二)
  6. 使用Python进行并发编程-asyncio篇(三)
  7. 知乎Live全文搜索之让elasticsearch_dsl支持asyncio

技术选型

  1. Sanic,基于Python 3.5+的异步web服务器,和Flask一样使用装饰器作为路由,支持Blueprint。效果确实非常快。
  2. uvloop,Sanic默认使用uvloop,这个实现基于libuv,比asyncio默认的loop的块很多。
  3. marshmallow,一个轻量级的转化复杂对象成为Python自带数据类型的库,为什么用它未来会详细介绍。

使用Schema

了解过关系型数据库如MySQL的同学会比较熟悉Schema这个词,它是对数据库的结构描述。Schema定义了表与表和表与字段之间的关系。在使用关系型数据库之前第一件事要先定Schema,创建表(库)再去操作。

为什么我们在实现RESTful API的时候要考虑使用Schema呢?

  1. 首先API服务是给外部用的,比如移动端(安卓、IOS等),前端(用AJAX)等。那么大家一开始就要协商好那些字段,以及字段的类型。因为你不关系,他们都是关心的,如果设计有问题会由于没有正确处理而造成移动端闪退等严重问题。
  2. 你需要对API返回的数据进行良好的管理和验证。

marshmallow把一组数据映射成一个类:

1
2
3
4
5
6
7
from marshmallow import Schema, fields
class UserSchema(Schema):
id = fields.Integer()
url = fields.Str()
name = fields.Str()
...

这样既让不同编程语言的开发者一目了然,也能检验你提供的数据是不是按照这个定好的结构返回的。

同一组数据可以定义多种Schema

思考一下,在搜索页面,每一项提供的空间有限,你无法把Live的全部信息(比如「描述」这种很长的内容的字段)都展示出来,也就是就算都返回了,其实只用了一部分字段,这造成了更多的网络延时和带宽消耗。但是在Live详情页理论上就可以展示全部的字段的内容了。

再假设下,如果是返回一部分,还是返回全部字段在后端做,就是不同的方法返回时对to_dict方法对一对if/elif/else的处理,其实看起来很乱。我是这样用的:

1
2
3
4
5
6
7
8
9
10
11
class UserSchema(Schema):
id = fields.Integer()
url = fields.Str()
name = fields.Str()
...
class UserFullSchema(UserSchema):
lives_url = fields.Str()
speaker_id = fields.Str()
...

也就是定义了多种User的schema,按需选择。但是后端统一使用to_dict方法返回全部数据,在视图渲染的时候进行筛选。这样需要用一种好的表达方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from views.utils import marshal_with
@bp.route('/search')
@marshal_with([LiveSchema, UserSchema])
async def search(request):
...
@bp.route('/suggest')
@marshal_with(LiveSchema)
async def suggest(request):
...
@bp.route('/user/<user_id>')
@marshal_with([LiveFullSchema, UserFullSchema])
async def user(request, user_id):
...

我们先不考虑视图内的逻辑,简单的理解成他们是单个live的to_dict结果或者多个live的to_dict结果的列表

通过神奇的marshal_with装饰器传入你希望返回符合那种Schema的数据。

现在揭晓一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def marshal(data, fields):
schemas = [field() for field in fields]
if isinstance(data, (list, tuple)):
return [marshal(d, fields) for d in data]
type = data.get('type')
for schema in schemas:
if type in schema.__class__.__name__.lower():
result, errors = schema.dump(data)
if errors:
for item in errors.items():
print('{}: {}'.format(*item))
return result
class marshal_with(object):
def __init__(self, fields):
if not isinstance(fields, list):
fields = [fields]
self.fields = fields
def __call__(self, f):
@wraps(f)
async def wrapper(*args, **kwargs):
resp = await f(*args, **kwargs)
return marshal(resp, self.fields)
return wrapper

这个是async版本的,大家可以自己发散成Python 2的普通版。记得之前我们在model里面给每种数据的to_dict方法里面加了个{'type': 'live'}这种键值么,除了在小程序里面分辨数据的类型,在这里也是用来匹配那种schema的。举个例子,假如是一个user类型的数据。

@marshal_with([LiveFullSchema, UserFullSchema])的装饰下,由于UserFullSchema类名包含了live所以符合了。这比较黑科技一点..

深入使用marshmallow

marshmallow除了生成一个可读性很好的类和验证该字段是不是类型符合以外,还支持序列化和反序列化的处理。有什么意义呢。假如如下的schema:

1
2
class LiveSchema(Schema):
starts_at = fields.Date()

注意我们model存的starts_at是一个datetime类型的对象,无法被json序列化,所以返回之前应该先转化成字符串:

1
2
3
4
5
class LiveSchema(Schema):
starts_at = fields.Method('get_start_time')
def get_start_time(self, obj):
return int(obj['starts_at'].strftime('%s'))

再举个例子:

1
2
3
4
class UserSchema(Schema):
bio = fields.Str()
headline = fields.Str()
description = fields.Str()

bio/headline/description这三个字段内容都有可能比较长,需要做不同的截取:在详情页显示全部,在搜索页之前显示前2行.. 我们需要一个truncate函数:

1
2
3
4
WIDTH = 45
def truncate_utf8(str, width=WIDTH):
return str[:width] + '...' if len(str) > width else str

假如使用fields.Method你就要写三方法,因为它指定的方法只有self, obj2个参数,而且Schema是不能继承的。这么办呢?有我的书中介绍的partialmethod可完美实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from functools import partialmethod
class Item(object):
def truncate(self, attr, obj):
if attr not in obj:
return ''
return truncate_utf8(obj[attr], WIDTH)
class UserSchema(Schema):
bio = fields.Method('truncate_bio')
headline = fields.Method('truncate_headline')
description = fields.Method('truncate_description')
truncate_headline = partialmethod(Item.truncate, 'headline')
truncate_bio = partialmethod(Item.truncate, 'bio')
truncate_description = partialmethod(Item.truncate, 'description')

通过partialmethod + 非继承至Schema的类就可以实现继承和额外参数了。

对Sanic定制

我们都知道,当你有独特的需求而框架不满足的时候,就要对其进行二次开发或者封装。我一般倾向基于框架提供的灵活性去封装。由于在多个API上都有分页的需要,参数中会出现start/limit(当然你可以更喜欢其他的词汇)。如果不定制,那么在每个视图里面都要加一句:

1
2
3
4
5
6
@bp.route('/search')
@marshal_with([LiveSchema, UserSchema])
async def search(request):
start = request.args.get('start', 0)
limit = request.args.get('limit', 10)
...

这2句就是个累赘。怎么做呢? 利用sanic提供的中间件就好了:

1
2
3
4
@app.middleware('request')
async def halt_request(request):
request.start = request.args.get('start', 0)
request.limit = request.args.get('limit', 10)

但是这还不够,因为sanic在创建Request的时候基于内存的考虑使用了__slots__,所以我们要重新写on_headers_complete方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from sanic.server import HttpProtocol, CIMultiDict
from sanic.request import Request as _Request
class Request(_Request):
__slots__ = (
'url', 'headers', 'version', 'method', '_cookies',
'query_string', 'body', 'start', 'limit',
'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files',
)
class JSONHttpProtocol(HttpProtocol):
def on_headers_complete(self):
remote_addr = self.transport.get_extra_info('peername')
if remote_addr:
self.headers.append(('Remote-Addr', '%s:%s' % remote_addr))
self.request = Request(
url_bytes=self.url,
headers=CIMultiDict(self.headers),
version=self.parser.get_http_version(),
method=self.parser.get_method().decode()
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8300, protocol=JSONHttpProtocol,
workers=4, debug=True)

这样在视图中就可以直接使用request.start和request.limit了。

其次看官方用法,都是在视图中控制返回的内容的类型,比如:

1
2
3
4
5
from sanic.response import json
@app.route('/')
async def test(request):
return json({"hello": "world"})

我也希望封装全部的返回结果的格式为:

1
{'rs': data}

PS: 当然生产环境中应该还有一个error字段甚至error_code字段标识如果出错的信息和类型等字段,我这里作为演示就保留了一个rs字段

这个data就是实际的视图返回的结果,但是在响应的时候已经封好了。可以继续重写write_response方法:

1
2
3
4
5
6
7
8
9
10
class JSONHttpProtocol(HttpProtocol):
def write_response(self, response):
if isinstance(response, str):
response = text(response)
elif isinstance(response, (list, dict)):
response = {'rs': response}
if isinstance(response, dict):
response = json(response)
return super().write_response(response)

这样返回的结果就很统一了。

至此,一个异步的、风格明确的、功能满足需要的API服务就完成了。

项目具体代码可见: https://github.com/dongweiming/weapp-zhihulive