concurrent.futures 是 Python3.2 加入标准库的一个模块,它提供异步执行回调高层接口,是对线程池和进程池的进一步封装,让开发者可以使用统一的接口非常容易的使用线程池和进程池。

之前我写过一篇叫做 使用 Python 进行并发编程 - PoolExecutor 篇 的文章介绍它,经过这几年的使用又积累了一些经验和思考借着这个周末记录下来

使用 concurrent.futures

这一小节先整体回顾一下concurrent.futures模块的重要内容。

模块中重要的类

concurrent.futures模块中最核心的是面向开发者的 2 个类:

  • ThreadPoolExecutor。顾名思义,创建一个可以提交作业的线程池。
  • ProcessPoolExecutor。以相同的方式工作,它使用多进程而不是多线程作为工作池。

选择它们的经验法则如下:

  1. 执行重 I/O 操作的任务 (IO 密集型) 选择 ThreadPoolExecutor,例如请求网页数据,文件读写等涉及网络、磁盘 I/O 相关的内容。
  2. 执行重 CPU 的任务 (CPU 密集型) 选择 ProcessPoolExecutor,例如大量消耗 CPU 的数学与逻辑运算、视频编解码等内容

其中ProcessPoolExecutor可以避开 GIL 的问题,但是由于需要传递参数给工作进程,所以正常情况下只有可序列化的对象可以执行并返回,看一个会出错的例子:

from concurrent.futures import ProcessPoolExecutor

f = open('1.txt', 'a+')

def write(f, line):
    f.writeline(line)


with ProcessPoolExecutor() as executor:
    future = executor.submit(write, f, 'abc')
    print(f'RESULT: {future.result()}')

一眼看去好像也没什么问题,运行一下:

❯ python unpickled.py
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/local/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
"""
...

其实这个错误输出已经提示了,是pickle.dumps时出错了:

In : pickle.dumps(f)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-34-6d61f1b895e9> in <module>
----> 1 pickle.dumps(f)

TypeError: cannot serialize '_io.TextIOWrapper' object

另外还有PicklingError类型错误,造成这个问题通常是对象依赖外部系统状态,用户可以自定义类可以通过提供__getstate__和`setstate方法来绕过这些限制,当然也可以封装或者修改业务逻辑,传递可以被正常序列化的参数**。

Executor 的重要方法

ProcessPoolExecutorThreadPoolExecutor类中最重要的 2 个方法如下:

  • submit。提交任务,并返回 Future 对象代表可调用对象的执行。
  • map。和 Python 自带的 map 函数功能类似,只不过是以异步的方式把函数依次作用在列表的每个元素上。

如果一次性提交一批任务可以使用map,如果单个任务提交用submit:

In : def double(n):
...:     return n * 2
...:

In : from concurrent.futures import ThreadPoolExecutor

In : with ThreadPoolExecutor() as executor:
...:     f1 = executor.submit(double, 10)
...:     f2 = executor.submit(double, 20)
...:     print(f1.result(), f2.result())
...:
20 40

In : NUMBERS = [10, 20, 30]

In : with ThreadPoolExecutor() as executor:
...:     for n, rs in zip(NUMBERS, executor.map(double, NUMBERS)):
...:         print(f'{n} * 2 -> {rs}')
...:
10 * 2 -> 20
20 * 2 -> 40
30 * 2 -> 60

函数

模块下有 2 个重要函数waitas_completed

wait用来等待指定的Future实例完成,它和asyncio.wait意图很像,返回值有 2 项,第一项表示完成的任务列表 (done),第二项表示为 完成的任务列表 (not_done):

In : from concurrent.futures import ProcessPoolExecutor, wait

In : with ProcessPoolExecutor() as executor:
...:     fs = [executor.submit(double, n) for url in (1, 2, 3)]
...:     rs = wait(fs)
...:

In : rs
Out: DoneAndNotDoneFutures(done={<Future at 0x106b58080 state=finished returned int>,
<Future at 0x1073e2128 state=finished returned int>,
<Future at 0x10690acc0 state=finished returned int>}, not_done=set())

concurrent.futures.wait也支持return_when参数,默认是ALL_COMPLETED,表示等全部任务完成,其他可选的还有FIRST_COMPLETEDFIRST_EXCEPTION。改一下看效果:

In : def div(a, b):
...:     time.sleep(a + b)
...:     return a / b
...:

In : from concurrent.futures import FIRST_COMPLETED

In : with ProcessPoolExecutor() as executor:
...:     fs = [executor.submit(div, *item) for item in ((1, 0), (1, 2), (2, 3))]
...:     rs = wait(fs, return_when=FIRST_COMPLETED)
...:
...:

In : rs
Out: DoneAndNotDoneFutures(done={<Future at 0x106aa4c50 state=finished raised ZeroDivisionError>},
not_done={<Future at 0x1065c1ac8 state=finished returned float>,
<Future at 0x1053eb860 state=finished returned float>})

as_completed函数返回一个包含指定的 Future 实例的迭代器,这些实例会在完成时被 yield 出来:

In : import random

In : def double(n):
...:     time.sleep(random.randint(0, 5))
...:     return n * 2
...:
...:

In : with ProcessPoolExecutor() as executor:
...:     fs = {executor.submit(double, n): n for n in NUMBERS}
...:     for future in as_completed(fs):
...:         n = fs[future]
...:         print(f'{n} * 2 = {future.result()}')
...:
20 * 2 = 40
10 * 2 = 20
30 * 2 = 60

In : with ProcessPoolExecutor() as executor:
...:     fs = {executor.submit(double, n): n for n in NUMBERS}
...:     for future in as_completed(fs):
...:         n = fs[future]
...:         print(f'{n} * 2 = {future.result()}')
...:
10 * 2 = 20
30 * 2 = 60
20 * 2 = 40

由于执行double函数时有随机 sleep 的影响,能感受到重复执行任务的完成顺序是不一样的。

正确使用 submit/map

在工作中,我见过无数次这样写逻辑的:

In : with ProcessPoolExecutor() as executor:
...:     f1 = executor.submit(div, 1, 2)
...:     f2 = executor.submit(div, 1, 0)
...:     f3 = executor.submit(div, 3, 2)
...:

In : f1, f2, f3
Out:
(<Future at 0x1071e90f0 state=finished returned float>,
 <Future at 0x1071e9898 state=finished raised ZeroDivisionError>,
 <Future at 0x106955198 state=finished returned float>)

In : def str2int(s):
...:     return int(s)
...:

In : NUMBERS = ['1', '3.0', 'abc']

In : with ProcessPoolExecutor() as executor:
...:     rs = executor.map(str2int, NUMBERS)
...:

In : rs
Out: <generator object _chain_from_iterable_of_lists at 0x1068ccc78>

这样的写法的问题是忽略了异常,因为返回的是 Future 对象或者生成器,并没有调用对应的result方法,如果抛了错用户是不知道的,所以通常要需要调用其result方法并且捕捉异常:

In : for f in (f1, f2, f3):
...:     try:
...:         print(f.result())
...:     except Exception as exc:
...:         print(f'Generated an exception: {exc}')
...:
0.5
Generated an exception: division by zero
1.5

map的结果就比较麻烦获取了:

In : while 1:
...:     try:
...:         print(next(rs))
...:     except StopIteration:
...:         break
...:     except Exception as exc:
...:         print(f'Generated an exception: {exc}')
...:
1
Generated an exception: invalid literal for int() with base 10: '3.0'

可以看到第一次错误发生后生成器就结束了,所以一批任务中可能会出现异常是不合适用map的,因为list(rs)或者对结果做循环是会由于某个任务抛错而获得不了后面的那些任务结果,最好的方式还是submit + as_completed

善用 max_workers

ProcessPoolExecutorThreadPoolExecutor都接受max_workers参数,表示用来执行任务的进程 / 线程数量。ProcessPoolExecutor 的默认值是 CPU 的个数 (通过 < code>os.cpu_count () 获得),而 ThreadPoolExecutor 的默认值是 CPU 的个数的 5 倍!

对于初学者或者通常情况下是不需要手动设置max_workers参数,默认值是可以足够好的工作的。但是:

  • 根据不同的业务场景,提高 max_workers 可以加快任务完成。不过要注意,不是值越高越高,超过一定阈值会起到反作用。尤其是在 IO 密集型的任务上使用 ThreadPoolExecutor,不同的 max_workers 差别会很大,但是影响网络问题因素太多,我这里就不举例了。
  • 有时候服务器上跑了很多重要服务,不希望某个任务影响到全局,还可以按需把 max_workers 的值设置成小于默认值。

善用 chunksize

Executormap方法支持chunksize参数:

Signature: ProcessPoolExecutor.map(self, fn, *iterables, timeout=None, chunksize=1)
Source:
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: If greater than one, the iterables will be chopped into
                chunks of size chunksize and submitted to the process pool.
                If set to one, the items in the list will be sent one at a time.
        ...
        """"

最早了解这个参数是通过multiprocessing.pool.Poolmap方法,而在concurrent.futures里面chunksize默认值是 1,它相当于任务分块提交的单位,默认就表示一次从任务列表中发送一个任务。** 如果任务量很大且任务执行周期很短,可以改大chunksize的``。

在之前我写的书 《Python Web 开发实战》 里面介绍 Celery 时,我也提到了 Prefetch Limits 方面的配置,其实是一个道理。