这是「使用 Python 进行并发编程」系列的最后一篇。我特意地把它安排在了 16 年最后一天,先祝各位元旦快乐。

重新实验上篇的效率对比的实现

在第一篇我们曾经对比并发执行的效率,但是请求的是 httpbin.org 这个网站。很容易受到网络状态和其服务质量的影响。所以我考虑启用一个本地的 eb 服务。那接下来选方案吧。

我用 sanic 提供的 不同方案的例子 ,对 tornado、aiohttp+ujson+uvloop、sanic+uvloop 三种方案,在最新的 Python 3.6 下,使用 wrk 进行了性能测试。

先解释下上面提到的几个关键词:

  1. aiohttp。一个实现了 PEP3156 的 HTTP 的服务器,且包含客户端相关功能。最早出现,应该最知名。
  2. sanic。后起之秀,基于 Flask 语法的异步 Web 框架。
  3. uvloop。用 Cython 编写的、用来替代 asyncio 事件循环。作者说「它在速度上至少比 Node.js、gevent 以及其它任何 Python 异步框架快 2 倍」。
  4. ujson。比标准库 json 及其社区版的 simplejson 都要快的 JSON 编解码库。

使用的测试命令是:

wrk -d20s -t10 -c200 http://127.0.0.1:8000

表示使用 10 个线程、并发 200、持续 20 秒。

在我个人 Mac 上获得的结果是:

方案 tornado aiohttp + ujson + uvloop sanic + uvloop
平均延时 122.58ms 35.49ms 11.03ms
请求数 / 秒 162.94 566.87 2.02k

所以简单的返回 json 数据,看起来 sanic + uvloop 是最快的。首先我对市面的各种 Benchmark 的对比是非常反感的,不能用 hello world 这种级别的例子的结果就片面的认为某种方案效率是最好的,一定要根据你实际的生产环境,再不行影响线上服务的前提下,对一部分有代表性的接口进程流量镜像之类的方式去进行效率的对比。而我认可上述的结果是因为正好满足我接下来测试用到的功能而已。

写一个能 GET 某参数返回这个参数的 sanic+uvloop 的版本的例子:

from sanic import Sanic
from sanic.response import json

app = Sanic(__name__)


@app.route('/get')
async def test(request):
    a = request.args.get('a')
    return json({'args': {'a': a}})


if __name__ == '__main__':
    app.run(host='127.0.0.1', port=8000)

然后把之前的效率对比的代码改造一下,需要变化如下几步:

  1. 替换请求地址,也就是把 httpbin.org 改成了 localhost:8000
  2. 增加要爬取的页面数量,由于 sanic 太快了(无奈脸),12 个页面秒完,所以改成 NUMBERS = range (240)
  3. 由于页面数量大幅增加,不能在终端都打印出来。而且之前已经验证过正确性。去掉那些 print

看下效果:

❯ python3 scraper_thread.py
Use requests+ThreadPoolExecutor cost: 0.9809930324554443
Use asyncio+requests+ThreadPoolExecutor cost: 0.9977471828460693
Use asyncio+aiohttp cost: 0.25928187370300293
Use asyncio+aiohttp+ThreadPoolExecutor cost: 0.278397798538208

可以感受到 asyncio+aiohttp 依然是最快的。随便挺一下 Sanic,准备有机会在实际工作中用一下。

asyncio 在背后怎么运行的呢?

Asynchronous Python 这篇文章里面我找到一个表达的不错的 asyncio 运行的序列图。例子我改编如下:

import asyncio

async def compute(x, y):
    print('Compute {} + {} ...'.format(x, y))
    await asyncio.sleep(1.0)
    return x + y


async def print_sum(x, y):
    result = await compute(x, y)
    print('{} + {} = {}'.format(x, y, result))


loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

运行的过程是这样的:

如何把同步的代码改成异步的

之前有位订阅我的公众号的同学问过这个问题,我想了一个例子来让事情变的清楚。

首先看一个同步的例子:

def handle(id):
    subject = get_subject_from_db(id)
    buyinfo = get_buyinfo(id)
    change = process(subject, buyinfo)
    notify_change(change)
    flush_cache(id)

可以看到,需要获取 subject 和 buyinfo 之后才能执行 process,然后才能执行 notify_change 和 flush_cache。

如果使用 asyncio,就是这样写:

import asyncio


async def handle(id):
    subject = asyncio.ensure_future(get_subject_from_db(id))
    buyinfo = asyncio.ensure_future(get_buyinfo(id))
    results = await asyncio.gather(subject, buyinfo)
    change = await process(results)
    await notify_change(change) 
    loop.call_soon(flush_cache, id)

原则上无非是让能一起协同的函数异步化(subject 和 buyinfo 已经是 Future 对象了),然后通过 gather 获取到这些函数执行的结果;有顺序的就用 call_soon 来保证。

继续深入,现在详细了解下一步还有什么其他解决方案以及其应用场景:

  1. 包装成 Future 对象。上面使用了 ensure_future 来做,上篇也说过,也可以用 loop.create_task。如果你看的是老文章可能会出现 asyncio.async 这种用法,它现在已经被弃用了。如果你已经非常熟悉,你也可以直接使用 asyncio.Task (get_subject_from_db (id)) 这样的方式。
  2. 回调。上面用到了 call_soon 这种回调。除此之外还有如下两种:

    1. loop.call_later (delay, func, *args)。延迟 delay 秒之后再执行。
    2. loop.call_at (when, func, *args)。 某个时刻才执行。

其实套路就是这些罢了。

爬虫分析

可能你已经听过 开源程序架构 系列书了。今天我们将介绍第四本 500 Lines or Less 中的 爬虫项目 。顺便说一下,这个项目里面每章都是由不同领域非常知名的专家而写,代码不超过 500 行。目前包含 web 服务器、决策采样器、Python 解释器、爬虫、模板引擎、OCR 持续集成系统、分布式系统、静态检查等内容。值得大家好好学习下。

我们看的这个例子,是实现一个高性能网络爬虫,它能够抓取你指定的网站的全部地址。它是由 MongoDB 的 C 和 Python 驱动的主要开发者 ajdavis 以及 Python 之父 Guido van Rossum 一起完成的。BTW, 我是 ajdavis 粉儿!

如果你想看了解这篇爬虫教程可以访问: A Web Crawler With asyncio Coroutines ,这篇和教程关系不大,是一篇分析文章。

我们首先下载并安装对应的依赖:

 git clone https://github.com/aosabook/500lines
 cd 500lines
 python3 -m pip install -r requirements.txt

运行一下,看看效果:

 python3 crawler/code/crawl.py -q python-cn.org --exclude github
...
http://python-cn.org:80/user/zuoshou/topics 200 text/html utf-8 13212 0/22
http://python-cn.org:80/users 200 text/html utf-8 34156 24/41
http://python-cn.org:80/users/online 200 text/html utf-8 11614 0/17
http://python-cn.org:80/users/sort-posts 200 text/html utf-8 34642 0/41
http://python-cn.org:80/users/sort-reputation 200 text/html utf-8 34721 15/41
Finished 2365 urls in 47.868 secs (max_tasks=100) (0.494 urls/sec/task)
         4 error
        36 error_bytes
      2068 html
  42735445 html_bytes
        98 other
    937394 other_bytes
       195 redirect
         4 status_404
Todo: 0
Done: 2365
Date: Fri Dec 30 22:03:50 2016 local time

可以看到http://python-cn.org有 2365 个页面,花费了 47.868 秒,并发为 100。

这个项目有如下一些文件:

❯ tree crawler/code -L 1
crawler/code
├── Makefile
├── crawl.py
├── crawling.py
├── reporting.py
├── requirements.txt
├── supplemental
└── test.py

其中主要有如下三个程序:

  1. crawl.py 是主程序,其中包含了参数解析,以及事件循环。
  2. crawling.py 抓取程序,crawl.py 中的异步函数就是其中的 Crawler 类的 crawl 方法。
  3. reporting.py 顾名思义,生成抓取结果的程序。

本文主要看 crawling.py 部分。虽然它已经很小(加上空行才 275 行),但是为了让爬虫的核心更直观,我把其中的兼容性、日志功能以及异常的处理去掉,并将处理成 Python 3.5 新的 async/await 语法。

首先列一下这个爬虫实现什么功能:

  1. 输入一个根链接,让爬虫自动帮助我们爬完所有能找到的链接
  2. 把全部的抓取结果存到一个列表中
  3. 可以排除包含某些关键词链接的抓取
  4. 可以控制并发数
  5. 可以抓取自动重定向的页面,且可以限制重定向的次数
  6. 抓取失败可重试

目前对一个复杂的结果结构常定义一个 namedtuple,首先把抓取的结果定义成一个 FetchStatistic:

FetchStatistic = namedtuple('FetchStatistic',
                            ['url',
                             'next_url',
                             'status',
                             'exception',
                             'size',
                             'content_type',
                             'encoding',
                             'num_urls',
                             'num_new_urls'])

其中包含了 url,文件类型,状态码等用得到的信息。

然后实现抓取类 Crawler,首先是初始化方法:

class Crawler:
    def __init__(self, roots,
                 exclude=None, strict=True,  # What to crawl.
                 max_redirect=10, max_tries=4,  # Per-url limits.
                 max_tasks=10, *, loop=None):
        self.loop = loop or asyncio.get_event_loop()
        self.roots = roots
        self.exclude = exclude
        self.strict = strict
        self.max_redirect = max_redirect
        self.max_tries = max_tries
        self.max_tasks = max_tasks
        self.q = Queue(loop=self.loop)
        self.seen_urls = set()
        self.done = []
        self.session = aiohttp.ClientSession(loop=self.loop)
        self.root_domains = set()
        for root in roots:
            parts = urllib.parse.urlparse(root)
            host, port = urllib.parse.splitport(parts.netloc)
            if not host:
                continue
            if re.match(r'\A[\d\.]*\Z', host):
                self.root_domains.add(host)              
            else:
                host = host.lower()
                if self.strict:
                    self.root_domains.add(host)
                else:
                    self.root_domains.add(lenient_host(host))
        for root in roots:
            self.add_url(root)
        self.t0 = time.time()
        self.t1 = None

信息量比较大,我拿出重要的解释下:

  1. 第 7 行,self.roots 就是待抓取的网站地址,是一个列表。
  2. 第 13 行,self.q 这个队列就存储了待抓取的 url
  3. 第 14 行,self.seen_urls 会保证不重复与抓取已经抓取过的 url
  4. 第 16 行,使用 requests 或者 aiphttp,都是推荐使用一个会话完成全部工作,要不然有些需要登陆之后的操作就做不了了。
  5. 第 18-30 行,这个 for 循环会解析 self.roots 中的域名,这是为了只抓取指定的网站,其它网站的链接会基于这个集合过滤掉
  6. 第 31-32 行,触发抓取,把 url 放入 self.q 的队列,就可以被 worker 执行了
  7. 第 33-34 行,t0 和 t1 是为了记录抓取的时间戳,最后可以计算抓取的总耗时

接着我们看 add_url 的实现:

def add_url(self, url, max_redirect=None):
    if max_redirect is None:
        max_redirect = self.max_redirect
    self.seen_urls.add(url)
    self.q.put_nowait((url, max_redirect))

其中 q.put_nowait 相当于非阻塞的 q.put,还可以看到这个 url 被放入了 self.seen_urls

现在我们从事件循环会用到的 crawl 方法开始往回溯:

async def crawl(self):
    workers = [asyncio.Task(self.work(), loop=self.loop)
               for _ in range(self.max_tasks)]
    self.t0 = time.time()
    await self.q.join()
    self.t1 = time.time()
    for w in workers:
        w.cancel()

类中的方法可以直接用 async 关键词的。worker 就是 self.work,这些 worker 会在后台运行,但是会阻塞在 join 上,直到初始化时候放入 self.q 的 url 都完成。最后需要让 worker 都取消掉。

然后看 self.work:

async def work(self):
    try:
        while True:
            url, max_redirect = await self.q.get()
            assert url in self.seen_urls
            await self.fetch(url, max_redirect)
            self.q.task_done()
    except asyncio.CancelledError:
        pass

当执行 worker.cancel 方法就会引起 asyncio.CancelledError,然后 while 1 的循环就结束了。执行完 fetch 方法,需要标记 get 的这个 url 执行完成,也就是要执行 self.q.task_done,要不然最后 join 是永远结束不了的。

接下来就是 self.fetch,这个方法比较长:

async def fetch(self, url, max_redirect):
    tries = 0
    exception = None
    while tries < self.max_tries:
        try:
            response = await self.session.get(
                url, allow_redirects=False)
            break
        except aiohttp.ClientError as client_error:
            exception = client_error

        tries += 1
    else:
        self.record_statistic(FetchStatistic(url=url,
                                             next_url=None,
                                             status=None,
                                             exception=exception,
                                             size=0,
                                             content_type=None,
                                             encoding=None,
                                             num_urls=0,
                                             num_new_urls=0))
        return

    try:
        if is_redirect(response):
            location = response.headers['location']
            next_url = urllib.parse.urljoin(url, location)
            self.record_statistic(FetchStatistic(url=url,
                                                 next_url=next_url,
                                                 status=response.status,
                                                 exception=None,
                                                 size=0,
                                                 content_type=None,
                                                 encoding=None,
                                                 num_urls=0,
                                                 num_new_urls=0))

            if next_url in self.seen_urls:
                return
            if max_redirect > 0:
                self.add_url(next_url, max_redirect - 1)
            else:
                print('redirect limit reached for %r from %r',
                      next_url, url)
        else:
            stat, links = await self.parse_links(response)
            self.record_statistic(stat)
            for link in links.difference(self.seen_urls):
                self.q.put_nowait((link, self.max_redirect))
            self.seen_urls.update(links)
    finally:
        await response.release()

简单的说,fetch 就是去请求 url,获得响应。然后把结果组织成一个 FetchStatistic,通过 self.record_statistic 放进 self.done 这个列表,然后对结果进行解析,通过 self.parse_links (response) 或者这个页面的结果包含的其他链接,和现在已经抓取的链接集合对比,把还没有抓的放入 self.q。

如果这个 url 被重定向,就把重定向的链接放进 self.q,待 worker 拿走执行。

然后我们看 parse_links 的实现,也比较长:

async def parse_links(self, response):
    links = set()
    content_type = None
    encoding = None
    body = await response.read()

    if response.status == 200:
        content_type = response.headers.get('content-type')
        pdict = {}

        if content_type:
            content_type, pdict = cgi.parse_header(content_type)

        encoding = pdict.get('charset', 'utf-8')
        if content_type in ('text/html', 'application/xml'):
            text = await response.text()

            urls = set(re.findall(r'''(?i)href=["']([^\s"'<>]+)''',
                                  text))
            for url in urls:
                normalized = urllib.parse.urljoin(response.url, url)
                defragmented, frag = urllib.parse.urldefrag(normalized)
                if self.url_allowed(defragmented):
                    links.add(defragmented)

    stat = FetchStatistic(
        url=response.url,
        next_url=None,
        status=response.status,
        exception=None,
        size=len(body),
        content_type=content_type,
        encoding=encoding,
        num_urls=len(links),
        num_new_urls=len(links - self.seen_urls))

    return stat, links
`

其实就是用re.findall(r'''(?i)href=["']([^\s"'<>]+)''', text)找到链接,然后进行必要的过滤,就拿到全部链接了。

这就是一个爬虫,是不是很简单。但是写的算是「最佳实践」。最后,我们看一下怎么调用 Crawler:

loop = asyncio.get_event_loop()
crawler = Crawler(['http://python-cn.org'], max_tasks=100)
loop.run_until_complete(crawler.crawl())
print('Finished {0} urls in {1:.3f} secs'.format(len(crawler.done),
                                                 crawler.t1 - crawler.t0))
crawler.close()

loop.close()

希望对大家的爬虫技艺有提高!

最后祝大家元旦快乐

PS:本文全部代码可以在 微信公众号文章代码库项目 中找到。