前言

在之前的 深入理解 asyncio (二) 一文中我认为正确使用asyncio.shield的步骤是:

  1. 先创建 GatheringFuture 对象 ts
  2. 取消任务
  3. await ts

当时举的例子:

import asyncio


async def a():
    print('Suspending a')
    await asyncio.sleep(2)
    print('Resuming a')
    return 'A'


async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')
    return 'B'


async def c1():
    task1 = asyncio.shield(a())
    task2 = asyncio.create_task(b())
    task1.cancel()
    await asyncio.gather(task1, task2, return_exceptions=True)


async def c2():
    task1 = asyncio.shield(b())
    task2 = asyncio.create_task(a())
    task1.cancel()
    await asyncio.gather(task1, task2, return_exceptions=True)

async def c3():
    task1 = asyncio.shield(a())
    task2 = asyncio.create_task(b())
    ts = asyncio.gather(task1, task2, return_exceptions=True)
    task1.cancel()
    await ts

按当时我的理解,c1 和 c2 中先 cancel 再 gather 的用法是错误的,正确的是 c3 的写法。

昨天一位买过我书的读者 (网名:秋月风夏,感谢反馈) QQ 问我为什么不能复现,一番讨论发现我被 IPython 提供的 await 欺骗了:

In [1]: from coro import *

In [2]: await c3()
Suspending a
Suspending b
Resuming b  # 第一次执行await并没有完全执行a的逻辑

In [3]: await c3()
Suspending a
Suspending b
Resuming a
Resuming b  # 之后执行await才会让a执行完整

In [4]: await c1()
Suspending a
Suspending b
Resuming a
Resuming b

In [5]: await c1()
Suspending a
Suspending b
Resuming a
Resuming b

可以看到除了第一次,c1 也都是正常完成的,不存在我说的「陷阱」。

研究了半天 IPython 源码没发现这个问题是什么造成的,但是我们可以不使用 IPython 提供的 await 支持:

In [6]: asyncio.run(c1())
Suspending a
Suspending b
Resuming b

In [7]: asyncio.run(c1())
Suspending a
Suspending b
Resuming b

In [8]: asyncio.run(c3())
Suspending a
Suspending b
Resuming b

In [9]: asyncio.run(c3())
Suspending a
Suspending b
Resuming b

看到了吧?直接用asyncio.run运行这些发现他们都是不对的!!那回过头来,看看 c2:

In [10]: asyncio.run(c2())
Suspending b
Suspending a
Resuming b
Resuming a

In [11]: asyncio.run(c2())
Suspending b
Suspending a
Resuming b
Resuming a

它是正常的。而 c1 和 c2 的区别仅仅是取消协程 a 还是协程 b 的问题,那么为什么会造成取消的任务有些执行完整情况不同呢?大家可以先思考一下。

提示:协程 a/b 的asyncio.sleep时间不同:a 任务执行完整至少要 3 秒,b 任务执行完成至少要 1 秒。

所以在没有看源码确认前,我「猜」是asyncio.gather 在执行任务时不考虑那些被取消了的任务的感受,现有任务都完成即结束。所以:

  1. 如果取消任务 a,当 b 任务 1 秒结束后,a 还没完成,任务收集就结束了。所以任务 a 没有执行完整
  2. 如果取消任务 b,协程 b 要早于协程 a 的执行时间

所以,是「先 cancel 再 gather」还是「先 gather 再 cancel」没关系,要看取消的任务 (们) 来不来得及赶上 gather 最后一班车!

学习不能靠猜,刚才我读了源码和大家分享下 gather 是怎么处理取消了的任务 (Task) 的。
 我们就拿前面说的 c3 里面的逻辑来说。

asyncio.gather中首先会对所有 Task (task1 和 task2) 加一个 done_callback,由于 task1 被取消了,所以 task1.done () 的结果为 True (只要不是 pending 状态就为 done),在 asyncio.futures.Future 的实现中,task1 会立刻回调 (详细的可以看延伸阅读链接 1):

static PyObject *
future_add_done_callback(FutureObj *fut, PyObject *arg, PyObject *ctx)
{
    ...
    if (fut->fut_state != STATE_PENDING) {
        /* The future is done/cancelled, so schedule the callback
           right away. */
        if (call_soon(fut->fut_loop, arg, (PyObject*) fut, ctx)) {
            return NULL;
        }
    }
}

接着看一下 gather 里面的回调_done_callback(详细的可以看延伸阅读链接 2):

def _done_callback(fut):
    nonlocal nfinished
    nfinished += 1
    ...
    if nfinished == nfuts:
        results = []

        for fut in children:
            if fut.cancelled():
                res = exceptions.CancelledError()
            else:
                res = fut.exception()
                if res is None:
                    res = fut.result()
                    results.append(res)
    ...

每次有任务完成回调,一开始先让 nfinished 累加 1,nfuts 是任务总数。如果完成数 (nfinished) 等于任务总数时就开始对全部任务设置结果,如果任务被取消就会设置为exceptions.CancelledError()(哪怕它正常完成了也不会通过 result 方法把结果返回出来)。

前面说到 task1 在 gather 一开始就回调了,但是由于 nfuts 为 2,而 nfinished 为 1,所以不符合条件,需要等待 task2(也就是未取消的任务完成)才能返回。

asyncio.shield所谓的保护其实就是让协程 a 作为一个 Inner Future (内部的),再在事件循环上创建一个新的 Outer Future (外部的),所以 a 的逻辑继续进行 (inner),而我们取消的 task1 只是 outer。

那怎么用 Shield 才正确?

第一种情况:符合短板理论。也就是说「取消的任务耗时小于正常任务耗时」,那么在 gather 搜集结果时被取消的任务已经完成。可以感受到前面例子中的 c2 是正确的。

接着说第二种,也就是官网提到的场景。在官方文档中对它的描述非常模糊,就说了一句:

Protect an awaitable object from being cancelled.

我的理解就是「保护一个可 await 对象,防止其被取消」。这里就不得不吐槽官方文档不够明确的描述和例子了。我觉得应该这样说:

假如有个 Task (叫做 something) 被 shield 保护,如下:

outer = shield(something())
res = await outer

如果outer被取消了,不会影响 Task 本身 (something) 的执行。

所以官网里面说了一句:

From the point of view of something(), the cancellation did not happen.

也就是「从 something () 的角度看来,取消操作并没有发生」。官网没有给出完整的例子,我用 2 个例子来帮助理解:

async def stop_after(loop, when):
    await asyncio.sleep(when)
    loop.stop()


def c4():
    loop = asyncio.get_event_loop()
    outer = asyncio.shield(a())
    outer.cancel()
    loop.create_task(stop_after(loop, 3))

    loop.run_forever()
    print(outer.cancelled())

注意 c4 不是异步函数,在里面使用loop.run_forever让事件循环一直运行下去,里面有 3 个任务:a()outerstop_after(loop, 3),第三个任务会在 3 秒后把停掉,这个例子可以用来验证上面说的「如果outer被取消了,不会影响 Task 本身 (something) 的执行」

运行一下:

In [14]: c4()
Suspending a
Resuming a
True

可以看到这样做,虽然 outer 取消了,但是异步函数 a 的逻辑执行完整了。

基于这种思路,我突然想到为什么「IPython 用 await 第一次没有执行完整,之后每次都能执行完整」,说结论前先看另外一个例子:

async def cancel_after(task, when):
    await asyncio.sleep(when)
    task.cancel()


async def d(n):
    print(f'Suspending a with {n}')
    await asyncio.sleep(2)
    print(f'Resuming a with {n}')
    return 'A'


async def c5():
    loop = asyncio.get_event_loop()
    n = random.randint(1, 100)
    outer = asyncio.shield(d(n))
    loop.create_task(cancel_after(outer, 1))
    try:
        await outer
    except asyncio.CancelledError:
        print('Cancelled!')

这个例子中有 2 个任务,outercancel_after(outer, 1),cancel_after 会在 1 秒后把 outer 取消掉。另外这次用了新的一步函数 d,接受一个随机参数(一会就能感受到用意)而且这次要重新进入 IPython 交互环境 (防止之前的测试对其产生影响),而且不再用aysncio.run了:

In [1]: from coro import *

In [2]: import asyncio

In [3]: import random

In [4]: loop = asyncio.get_event_loop()

In [5]: loop.run_until_complete(c5())
Suspending a with 72
Cancelled!  # 注意,这就是第一次,也没有把d运行完整

In [6]: loop.run_until_complete(c5())
Suspending a with 48  # 启动了下一轮随机数
Resuming a with 72  # 结束了上一轮随机数
Cancelled!

In [7]: loop.run_until_complete(c5())
Suspending a with 26
Resuming a with 48
Cancelled!

In [8]: loop.run_until_complete(c5())
Suspending a with 38
Resuming a with 26
Cancelled!

每次事件循环会把全部任务轮询一遍,outer.cancel 虽然取消了 Outer 任务,但是 Inner 任务还是 running 状态的 (有兴趣的同学可以写个例子证实一下),等待下次事件循环 ( loop.run_until_complete 的目标是等待 c5 执行结束,它并不关注 d 运行状态)。而之前在 IPython 里面用await就会造成第一次不完整也解释通了:

async def c6():
    n = random.randint(1, 100)
    task1 = asyncio.shield(d(n))
    task2 = asyncio.create_task(b())
    task1.cancel()
    await asyncio.gather(task1, task2, return_exceptions=True)

 ipython

In [1]: from coro import *

In [2]: await c6()
Suspending a with 34
Suspending b
Resuming b

In [3]: await c6()
Suspending a with 41
Suspending b
Resuming a with 34
Resuming b

In [4]: await c6()
Suspending a with 71
Suspending b
Resuming a with 41
Resuming b

看到了吧。那为什么说用asyncio.run不行呢:

In [5]: asyncio.run(c6())
Suspending a with 62
Suspending b
Resuming b

In [6]: asyncio.run(c6())
Suspending a with 94
Suspending b
Resuming b

其实可以猜出来:每次都创建了一个新的 loop。当然也得看源码确认一下 (详见延伸阅读链接 3):

def run(main, *, debug=False):
    ...
    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
    ...

里面用events.new_event_loop创建了新的事件循环,所以拿不到之前的事件循环上的待执行任务。

感觉这类场景这个比较适合 Web 框架中使用。

后记

希望这些内容让你对 asyncio 和asyncio.shield有更深的了解。

延伸阅读

  1. https://github.com/python/cpython/blob/b9a0376b0dedf16a2f82fa43d851119d1f7a2707/Modules/_asynciomodule.c#L627-L632
  2. https://github.com/python/cpython/blob/0baa72f4b2e7185298d09cf64c7b591efcd22af0/Lib/asyncio/tasks.py#L690-L724
  3. https://github.com/python/cpython/blob/master/Lib/asyncio/runners.py#L39-L41