Python 的 concurrent.futures 理解

这两天读了 concurrent.futures 源码,记录一下实现原理。

 

concurrent.futures 的本质是开若干个执行器(线程或进程)来执行 fucntion,fuction 的结果会被设置到 function 对应的 future 中,function 是否执行完通过 threading 事件解决。

对于执行器是线程的情况,是否执行完的实现大概如下:

可以对若干个 future ( fs ) 建立 一个 waiter,并赋给每一个 future,由于每当有 function 执行完之后会把其 future 加入到 waiter 的 finished_futures 中,所以只要检测 finished_futures 就可以,由于 waiter 有 threading 事件,当 future 加入到 finished_futures 的时候,可以 set threading 事件,在等待 finished_futures 的事件就会触发,拿到结果。

而对于进程,Executor 中有个 map() 函数,它返回一个生成器,生成器会依次返回每个 function 的执行结果。

 

concurrent.futures 主要包括三个文件:_base.py、thread.py 和 process.py,_base.py 主要是 Future 本身的内容,thread.py 和 process.py 是 Future 的执行器。

_base.py 主要定了三部分内容:

1). waiter 类。

class _Waiter(object):
class _AsCompletedWaiter(_Waiter):
class _FirstCompletedWaiter(_Waiter):
class _AllCompletedWaiter(_Waiter):

_Waiter 类用来等待 Future 执行完,_Waiter 里定义了 threading.Event(),_AsCompletedWaiter 每个 Future 完成都会触发 event.set(),_FirstCompletedWaiter 每个 Future 完成也会触发,_AllCompletedWaiter 会等所有 Future 完成才触发 event.set()。

另外,_AsCompletedWaiter 和 _AllCompletedWaiter 还有把锁 threading.Lock()。

2). 辅助函数。

def _create_and_install_waiters(fs, return_when):
def as_completed(fs, timeout=None):
def wait(fs, timeout=None, return_when=ALL_COMPLETED):

_create_and_install_waiters 是对 Future 列表 fs 创建和安装 waiter,创建好响应的 waiter 之后,会对 fs 中的每一个 Future 增加此 waiter (Future 有个列表变量 _waiters,加入即可),并且返回此 waiter;

as_completed 是一个生成器,配合 for 使用可以循环得到已经完成的 Future,as_completed 使用了 _create_and_install_waiters;

wait 用于等待 Future 列表依次完成。

3). Future 类和 Executor 类。

Future 类的成员变量:

self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._traceback = None
self._waiters = []
self._done_callbacks = []

_condition 用于控制 Future 内部的条件,比如 result() 要得到值,如果没有完成就要_condition.wait,直到 set_result() 触发 _condition.notify_all(),当然,cancel() 也可以触发 _condition.notify_all()。

Future 支持 callback,记录在 _done_callbacks,Future 完成后会执行这些 callback。

Executor 类供继承,需要实现 submit 方法,thread.py 中的 ThreadPoolExecutor 和 process.py 中的 ProcessPoolExecutor 都实现了此类。

 

thread.py 主要包括三部分:

class _WorkItem(object):
def _worker(executor_reference, work_queue):
class ThreadPoolExecutor(_base.Executor):

_WorkItem 是 Future 的包装,变量有 self.future、self.fn (执行的函数)、self.args 和 self.kwargs,里面的 run() 函数用来执行 Future 并设置 Future 信息,_state 会被设置成 FINISHED,如果是无异常会设置 _result,否则设置 _exception 和 _traceback。

_worker 不断从 _work_queue 队列中取 Future 并执行。

ThreadPoolExecutor 实现 _base.Executor,主要变量是 _max_workers  和 _work_queue,_max_workers 是最大线程数,_work_queue 是 queue.Queue(),当调用 submit 的时候会把 Future 包装成 _WorkItem,放入 _work_queue,然后开启最多 _max_workers 的线程去执行 _worker (不断读取队列并执行 )。

 

process.py  的实现和 thread.py 类似,只是开启进程来执行,下面是大概的逻辑:

92F6AD79-071D-4B1A-9CB7-3671F6ABC1B5

细节就不写了。

 

参考:

https://docs.python.org/3/library/concurrent.futures.html

 

关于 Python 线程

本文参照了 Python 线程库代码 threading.py。

 

1. 锁 ( Lock ),互斥锁,获取之后继续获取会阻塞;

 

2. 可重入锁 ( RLock ),基于 Lock,可以被一个线程重复获取的锁,只不过只能同时被一个线程获取。内部实现维护一个计数器,第一次获取时计数器赋值为1,下次继续获取会通过 _get_ident() 判断线程 id,如果锁的 owner 就是此线程,计数器加1,释放的时候会判断此线程是不是已经获取了锁,如果获取了计数器减1,否则报错;

 

3. 条件 ( Condition ),默认基于 RLock,调用 Condition.acquire 就获取了可重入锁,调用 Condition.release 就释放了可重入锁,Condition 实现了 __enter__ 和 __exit__ 和方法,可以配合 with 使用。

如果一个线程已经获取可重入锁,其他线程再想获取到此锁,必须等获取的线程释放掉锁;

Condition 有一个 wait  和 notify 函数,必须是已经获取了可重入锁的线程才可以执行。

wait 会新建一个 Lock (waiter),获取此锁,然后把锁放入队列 __waiters,然后释放可重入锁,接着继续获取锁(waiter),会阻塞,最后的 finally 语句中会继续获取可重入锁;

notify 会把 __waiters 中的前若干个(默认一个) Lock (waiter) 拿到,然后释放。

所以 wait 的作用是释放可重入锁,然后阻塞等待其他线程 notify 之后再获取可重入锁;

notify 的作用是通知线程获取可重入锁;

有一点要注意,收到通知的线程获取可重入锁会阻塞一下,因为发起 notify 的线程需要释放可重入锁之后收到通知的线程才可以获取到。

所以,获取到可重入锁的线程一般需要先 notify 然后 release,其他的线程获取之后可以先 wait 后 release。

说的更准确一点,两个线程,同时去获取可重入锁,然后一个干完事情 notify 和 release,另一个干完事后 wait 和 release,如果两个线程在干事情之间有个类似共享变量或者队列,可以做到一方写,一方读,实现生产者/消费者模型,虽然这里只能同时有一个读或一个写。

 

4. 信号量 ( Semaphore )。

信号量基于互斥锁 Condition,初始化一个 __value,当 acquire 时 __value 减 1,当 release 时__value 加 1, 当 __value 等于 0 时,Condition 进入 wait(),等待 notify 通知继续获取互斥锁,而且当 release 时都调用 notify,通知在阻塞状态的线程获取互斥锁。

可以看出信号量可以用来实现限制并发线程数。

另外,有一种有限信号量 ( BoundedSemaphore ),可以限制 release 的调用次数。

信号量的例子可以看这里:https://gist.github.com/presci/2661576

 

5. 事件 ( Event )。

事件也基于互斥锁 Condition,内部有一个标志:__flag ( False ),当调用 set() 时,把 __flag 设置成 True,并通知其他所有在 wait 的线程获取互斥锁。

Event 可以用来实现一个线程唤醒其他线程。

 

6. 定时器 ( Timer )。

基于 Event, 用来在一定时间后执行某个函数。

贴一下代码:

14951EC4-8759-4082-A4EC-1459E179CFB3