使用Python进行并发编程-asyncio篇(二)
/ / / 阅读数:11943我们今天继续深入学习 asyncio。
同步机制
asyncio 模块包含多种同步机制,每个原语的解释可以看 线程篇 ,这些原语的用法上和线程 / 进程有一些区别。
Semaphore(信号量)
并发的去爬取显然可以让爬虫工作显得更有效率,但是我们应该把抓取做的无害,这样既可以保证我们不容易发现,也不会对被爬的网站造成一些额外的压力。
在这里吐槽下,豆瓣现在几乎成了爬虫练手专用网站,我个人也不知道为啥?欢迎留言告诉我。难道是豆瓣一直秉承尊重用户的原则不轻易对用户才去封禁策略,造成大家觉得豆瓣最适合入门么?BTW,我每天在后台都能看到几十万次无效的抓取,也就是抓取程序写的有问题,但还在不停地请求着...
好吧回到正题,比如我现在要抓取http://httpbin.org/get?a=X这样的页面,X为1-10000的数字,一次性的产生1w次请求显然很快就会被封掉。那么我们可以用Semaphore控制同时的并发量(例子中为了演示,X为0-11):
import aiohttp import asyncio NUMBERS = range(12) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3) async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a'] async def print_result(a): with (await sema): r = await fetch_async(a) print('fetch({}) = {}'.format(a, r)) loop = asyncio.get_event_loop() f = asyncio.wait([print_result(num) for num in NUMBERS]) loop.run_until_complete(f) |
在运行的时候可以感受到并发受到了信号量的限制,基本保持在同时处理三个请求的标准。
Lock(锁)
看下面的例子:
❯ cat lock.py import asyncio import functools def unlock(lock): print('callback releasing lock') lock.release() async def test(locker, lock): print('{} waiting for the lock'.format(locker)) with await lock: print('{} acquired lock'.format(locker)) print('{} released lock'.format(locker)) async def main(loop): lock = asyncio.Lock() await lock.acquire() loop.call_later(0.1, functools.partial(unlock, lock)) await asyncio.wait([test('l1', lock), test('l2', lock)]) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close() |
这个例子中我们首先使用 acquire 加锁,通过 call_later 方法添加一个 0.1 秒后释放锁的函数。看一下调用:
❯ python3 lock.py l1 waiting for the lock l2 waiting for the lock callback releasing lock l1 acquired lock l1 released lock l2 acquired lock l2 released lock |
Condition(条件)
我们根据线程篇 Condition 的例子,改成一下:
import asyncio import functools async def consumer(cond, name, second): await asyncio.sleep(second) with await cond: await cond.wait() print('{}: Resource is available to consumer'.format(name)) async def producer(cond): await asyncio.sleep(2) for n in range(1, 3): with await cond: print('notifying consumer {}'.format(n)) cond.notify(n=n) await asyncio.sleep(0.1) async def producer2(cond): await asyncio.sleep(2) with await cond: print('Making resource available') cond.notify_all() async def main(loop): condition = asyncio.Condition() task = loop.create_task(producer(condition)) consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))] await asyncio.wait(consumers) task.cancel() task = loop.create_task(producer2(condition)) consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))] await asyncio.wait(consumers) task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close() |
这次演示了 2 种通知的方式:
- 使用 notify 方法挨个通知单个消费者
- 使用 notify_all 方法一次性的通知全部消费者
由于 producer 和 producer2 是异步的函数,所以不能使用之前 call_later 方法,需要用 create_task 把它创建成一个任务(Task)。但是最后记得要把任务取消掉。
执行以下看看效果:
❯ python3 condition.py notifying consumer 1 c1: Resource is available to consumer notifying consumer 2 c2: Resource is available to consumer Making resource available c1: Resource is available to consumer c2: Resource is available to consumer |
Event(事件)
模仿锁的例子实现:
import asyncio import functools def set_event(event): print('setting event in callback') event.set() async def test(name, event): print('{} waiting for event'.format(name)) await event.wait() print('{} triggered'.format(name)) async def main(loop): event = asyncio.Event() print('event start state: {}'.format(event.is_set())) loop.call_later( 0.1, functools.partial(set_event, event) ) await asyncio.wait([test('e1', event), test('e2', event)]) print('event end state: {}'.format(event.is_set())) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close() |
看起来也确实和锁的意思很像,不同的是,事件被触发时,2 个消费者不用获取锁就要尽快的执行下去了。
Queue
在 asyncio 官网上已经举例了 2 个很好的 队列例子 了,这文就不重复了。asyncio 同样支持 LifoQueue 和 PriorityQueue,我们体验下 aiohttp + 优先级队列的用法吧:
import asyncio import random import aiohttp NUMBERS = random.sample(range(100), 7) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3) async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a'] async def collect_result(a): with (await sema): return await fetch_async(a) async def produce(queue): for num in NUMBERS: print('producing {}'.format(num)) item = (num, num) await queue.put(item) async def consume(queue): while 1: item = await queue.get() num = item[0] rs = await collect_result(num) print('consuming {}...'.format(rs)) queue.task_done() async def run(): queue = asyncio.PriorityQueue() consumer = asyncio.ensure_future(consume(queue)) await produce(queue) await queue.join() consumer.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close() |
看到使用了新的 ensure_future 方法,其实它和之前说的 create_task 意思差不多,都是为了把一个异步的函数变成一个协程的 Task。它们的区别是:
- create_task 是 AbstractEventLoop 的抽象方法,不同的 loop 可以实现不同的创建 Task 方法,这里用的是 BaseEventLoop 的实现。
- ensure_future 是 asyncio 封装好的创建 Task 的函数,它还支持一些参数,甚至指定 loop。一般应该使用它,除非用到后面提到的 uvloop 这个第三方库。
这个例子中,首先我们从 0-99 中随机取出 7 个数字,放入优先级队列,看看消费者是不是按照从小到大的顺序执行的呢?
❯ python3 prioqueue.py producing 6 producing 4 producing 22 producing 48 producing 9 producing 90 producing 40 consuming 4... consuming 6... consuming 9... consuming 22... consuming 40... consuming 48... consuming 90... |
确实是这样的。
说到这里,我们稍微偏个题,看看 Task 是什么?
深入 Task
Task 类用来管理协同程序运行的状态。根据源码,我保留核心,实现一个简单的 Task 类帮助大家理解:
import asyncio class Task(asyncio.futures.Future): def __init__(self, gen, *,loop): super().__init__(loop=loop) self._gen = gen self._loop.call_soon(self._step) def _step(self, val=None, exc=None): try: if exc: f = self._gen.throw(exc) else: f = self._gen.send(val) except StopIteration as e: self.set_result(e.value) except Exception as e: self.set_exception(e) else: f.add_done_callback( self._wakeup) def _wakeup(self, fut): try: res = fut.result() except Exception as e: self._step(None, e) else: self._step(res, None) |
如果_step 方法没有让协程执行完成,就会添加回调,_wakeup 又会继续执行_step... 直到协程程序完成,并 set_result。
写个使用它的例子:
async def foo(): await asyncio.sleep(2) print('Hello Foo') async def bar(): await asyncio.sleep(1) print('Hello Bar') loop = asyncio.get_event_loop() tasks = [Task(foo(), loop=loop), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close() |
第一个任务是用我们自己的 Task 创建的,第二个是用 BaseEventLoop 自带的 create_task。
运行一下:
❯ python3 task.py Hello Bar Hello Foo |
自定义的 Task 类和 asyncio 自带的是可以好好协作的。
深入事件循环
asyncio 根据你的操作系统信息会帮你选择默认的事件循环类,在*nix 下使用的类继承于 BaseEventLoop,在上面已经提到了。和 Task 一样,我们剥离出一份最核心的实现:
import asyncio from collections import deque def done_callback(fut): fut._loop.stop() class Loop: def __init__(self): self._ready = deque() self._stopping = False def create_task(self, coro): Task = asyncio.tasks.Task task = Task(coro, loop=self) return task def run_until_complete(self, fut): tasks = asyncio.tasks # 获取任务 fut = tasks.ensure_future( fut, loop=self) # 增加任务到self._ready fut.add_done_callback(done_callback) # 跑全部任务 self.run_forever() # 从self._ready中移除 fut.remove_done_callback(done_callback) def run_forever(self): try: while 1: self._run_once() if self._stopping: break finally: self._stopping = False def call_soon(self, cb, *args): self._ready.append((cb, args)) def _run_once(self): ntodo = len(self._ready) for i in range(ntodo): t, a = self._ready.popleft() t(*a) def stop(self): self._stopping = True def close(self): self._ready.clear() def call_exception_handler(self, c): pass def get_debug(self): return False |
其中 call_exception_handler 和 get_debug 是必须存在的。
写个例子用一下:
async def foo(): print('Hello Foo') async def bar(): print('Hello Bar') loop = Loop() tasks = [loop.create_task(foo()), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close() |
执行:
❯ python3 loop.py Hello Foo Hello Bar |
也可以和 asyncio.wait 正常协作了。
PS:本文全部代码可以在 微信公众号文章代码库项目 中找到。