Python 的 concurrent.futures 理解

这两天读了 concurrent.futures 源码,记录一下实现原理。

 

concurrent.futures 的本质是开若干个执行器(线程或进程)来执行 fucntion,fuction 的结果会被设置到 function 对应的 future 中,function 是否执行完通过 threading 事件解决。

对于执行器是线程的情况,是否执行完的实现大概如下:

可以对若干个 future ( fs ) 建立 一个 waiter,并赋给每一个 future,由于每当有 function 执行完之后会把其 future 加入到 waiter 的 finished_futures 中,所以只要检测 finished_futures 就可以,由于 waiter 有 threading 事件,当 future 加入到 finished_futures 的时候,可以 set threading 事件,在等待 finished_futures 的事件就会触发,拿到结果。

而对于进程,Executor 中有个 map() 函数,它返回一个生成器,生成器会依次返回每个 function 的执行结果。

 

concurrent.futures 主要包括三个文件:_base.py、thread.py 和 process.py,_base.py 主要是 Future 本身的内容,thread.py 和 process.py 是 Future 的执行器。

_base.py 主要定了三部分内容:

1). waiter 类。

class _Waiter(object):
class _AsCompletedWaiter(_Waiter):
class _FirstCompletedWaiter(_Waiter):
class _AllCompletedWaiter(_Waiter):

_Waiter 类用来等待 Future 执行完,_Waiter 里定义了 threading.Event(),_AsCompletedWaiter 每个 Future 完成都会触发 event.set(),_FirstCompletedWaiter 每个 Future 完成也会触发,_AllCompletedWaiter 会等所有 Future 完成才触发 event.set()。

另外,_AsCompletedWaiter 和 _AllCompletedWaiter 还有把锁 threading.Lock()。

2). 辅助函数。

def _create_and_install_waiters(fs, return_when):
def as_completed(fs, timeout=None):
def wait(fs, timeout=None, return_when=ALL_COMPLETED):

_create_and_install_waiters 是对 Future 列表 fs 创建和安装 waiter,创建好响应的 waiter 之后,会对 fs 中的每一个 Future 增加此 waiter (Future 有个列表变量 _waiters,加入即可),并且返回此 waiter;

as_completed 是一个生成器,配合 for 使用可以循环得到已经完成的 Future,as_completed 使用了 _create_and_install_waiters;

wait 用于等待 Future 列表依次完成。

3). Future 类和 Executor 类。

Future 类的成员变量:

self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._traceback = None
self._waiters = []
self._done_callbacks = []

_condition 用于控制 Future 内部的条件,比如 result() 要得到值,如果没有完成就要_condition.wait,直到 set_result() 触发 _condition.notify_all(),当然,cancel() 也可以触发 _condition.notify_all()。

Future 支持 callback,记录在 _done_callbacks,Future 完成后会执行这些 callback。

Executor 类供继承,需要实现 submit 方法,thread.py 中的 ThreadPoolExecutor 和 process.py 中的 ProcessPoolExecutor 都实现了此类。

 

thread.py 主要包括三部分:

class _WorkItem(object):
def _worker(executor_reference, work_queue):
class ThreadPoolExecutor(_base.Executor):

_WorkItem 是 Future 的包装,变量有 self.future、self.fn (执行的函数)、self.args 和 self.kwargs,里面的 run() 函数用来执行 Future 并设置 Future 信息,_state 会被设置成 FINISHED,如果是无异常会设置 _result,否则设置 _exception 和 _traceback。

_worker 不断从 _work_queue 队列中取 Future 并执行。

ThreadPoolExecutor 实现 _base.Executor,主要变量是 _max_workers  和 _work_queue,_max_workers 是最大线程数,_work_queue 是 queue.Queue(),当调用 submit 的时候会把 Future 包装成 _WorkItem,放入 _work_queue,然后开启最多 _max_workers 的线程去执行 _worker (不断读取队列并执行 )。

 

process.py  的实现和 thread.py 类似,只是开启进程来执行,下面是大概的逻辑:

92F6AD79-071D-4B1A-9CB7-3671F6ABC1B5

细节就不写了。

 

参考:

https://docs.python.org/3/library/concurrent.futures.html

 

关于 Python 线程

本文参照了 Python 线程库代码 threading.py。

 

1. 锁 ( Lock ),互斥锁,获取之后继续获取会阻塞;

 

2. 可重入锁 ( RLock ),基于 Lock,可以被一个线程重复获取的锁,只不过只能同时被一个线程获取。内部实现维护一个计数器,第一次获取时计数器赋值为1,下次继续获取会通过 _get_ident() 判断线程 id,如果锁的 owner 就是此线程,计数器加1,释放的时候会判断此线程是不是已经获取了锁,如果获取了计数器减1,否则报错;

 

3. 条件 ( Condition ),默认基于 RLock,调用 Condition.acquire 就获取了可重入锁,调用 Condition.release 就释放了可重入锁,Condition 实现了 __enter__ 和 __exit__ 和方法,可以配合 with 使用。

如果一个线程已经获取可重入锁,其他线程再想获取到此锁,必须等获取的线程释放掉锁;

Condition 有一个 wait  和 notify 函数,必须是已经获取了可重入锁的线程才可以执行。

wait 会新建一个 Lock (waiter),获取此锁,然后把锁放入队列 __waiters,然后释放可重入锁,接着继续获取锁(waiter),会阻塞,最后的 finally 语句中会继续获取可重入锁;

notify 会把 __waiters 中的前若干个(默认一个) Lock (waiter) 拿到,然后释放。

所以 wait 的作用是释放可重入锁,然后阻塞等待其他线程 notify 之后再获取可重入锁;

notify 的作用是通知线程获取可重入锁;

有一点要注意,收到通知的线程获取可重入锁会阻塞一下,因为发起 notify 的线程需要释放可重入锁之后收到通知的线程才可以获取到。

所以,获取到可重入锁的线程一般需要先 notify 然后 release,其他的线程获取之后可以先 wait 后 release。

说的更准确一点,两个线程,同时去获取可重入锁,然后一个干完事情 notify 和 release,另一个干完事后 wait 和 release,如果两个线程在干事情之间有个类似共享变量或者队列,可以做到一方写,一方读,实现生产者/消费者模型,虽然这里只能同时有一个读或一个写。

 

4. 信号量 ( Semaphore )。

信号量基于互斥锁 Condition,初始化一个 __value,当 acquire 时 __value 减 1,当 release 时__value 加 1, 当 __value 等于 0 时,Condition 进入 wait(),等待 notify 通知继续获取互斥锁,而且当 release 时都调用 notify,通知在阻塞状态的线程获取互斥锁。

可以看出信号量可以用来实现限制并发线程数。

另外,有一种有限信号量 ( BoundedSemaphore ),可以限制 release 的调用次数。

信号量的例子可以看这里:https://gist.github.com/presci/2661576

 

5. 事件 ( Event )。

事件也基于互斥锁 Condition,内部有一个标志:__flag ( False ),当调用 set() 时,把 __flag 设置成 True,并通知其他所有在 wait 的线程获取互斥锁。

Event 可以用来实现一个线程唤醒其他线程。

 

6. 定时器 ( Timer )。

基于 Event, 用来在一定时间后执行某个函数。

贴一下代码:

14951EC4-8759-4082-A4EC-1459E179CFB3

 

 

socket.listen(backlog) 中 backlog 指的是什么

1.首先,创建 socket bind 并 listen 之后,服务端就可以独自完成三次握手了(抓包确认),调用 accept 才是把「连接」从队列里取出来。

#-*- encoding: UTF-8 -*-

from socket import *

HOST = ''
PORT = 80
BUFSIZ = 1024
ADDR = (HOST, PORT)

tcpSerSock = socket(AF_INET, SOCK_STREAM)
tcpSerSock.bind(ADDR)
tcpSerSock.listen(2)

while True:
    pass
    # print 'waiting for connection...'
    # tcpCliSock, addr = tcpSerSock.accept() # 等待连接
    # print '...connected from:', addr

tcpSerSock.close()

2.下面我们看看假如不调用 accept,最大能有多少 ESTABLISHED 的连接。

server 端代码还是采用上面的,client 代码如下:

#-*- encoding: UTF-8 -*-

import time
from socket import *
from multiprocessing.dummy import Pool as ThreadPool
HOST = '10.19.28.33'
PORT = 80
ADDR = (HOST, PORT)
MAX_THREAD_NUM = 100

def connect(x):
    tcpCliSock = socket(AF_INET, SOCK_STREAM)
    tcpCliSock.connect(ADDR) # 套接字连接
    time.sleep(4)
    return time.time(), tcpCliSock

pool = ThreadPool(MAX_THREAD_NUM)
result_data = pool.map(connect, xrange(MAX_THREAD_NUM))
pool.close()
pool.join()

print result_data

测试结果如下:

1). 当 server 端 listen 的 backlog 为0,最大的 ESTABLISHED 数量为 2;

2). 当 listen 的 backlog 为1,最大的 ESTABLISHED 数量为 2;

3). 当 listen 的 backlog 为 N,则最大的 ESTABLISHED 数量为 N + 1;

所以,backlog 代表着有多少个已经建立的但是没有被 accept 取走的连接数量。

另外,有个内核参数 net.core.somaxconn 定义了 socket 同时 listen 的最大连接数,所以 socket.listen() 指定的值不能超过 net.core.somaxconn。


3.如果 ESTABLISHED 满了,此时进来的连接请求状态是 SYN_RECV,那么 SYN_RECV 的数量(也就是 SYN 队列长度)最大是多少呢?

答案是由内核参数 net.ipv4.tcp_max_syn_backlog 决定。

我做了个测试,环境如下:

server 端,ip 10.19.28.33,代码如1,listen backlog 为2,net.ipv4.tcp_max_syn_backlog 设置为8,net.ipv4.tcp_syncookies 设置为0。

clinet 端,ip 10.0.11.12,代码如2,MAX_THREAD_NUM 设置为12。

两台机器的 /proc/sys/net/ipv4/tcp_synack_retries 和 /proc/sys/net/ipv4/tcp_syn_retries 都为2,而且 /proc/sys/net/ipv4/tcp_abort_on_overflow 都为0。

client 端会报:: [Errno 110] Connection timed out

在 server 端抓包,包在这里: socket.pcap

这里不纠结发生了什么,在后面的文章继续。

Python 程序的发布流程

记录一下 Python 的发布流程。


1.代码的项目里要求有 requirements.txt 文件。

可以使用 链接 中的 pip-tools 来生成 requirements.txt .

例如先在 requirements.in 写入

futures
ujson
tornado
DBUtils
jinja2
redis
MySQL-python

然后运行

pip-compile requirements.in

这个命令生成 requirements.txt,如下:

#
# This file is autogenerated by pip-compile
# Make changes in requirements.in, then run this to update:
#
# pip-compile requirements.in
#
backports.ssl-match-hostname==3.4.0.2 # via tornado
certifi==2015.4.28 # via tornado
dbutils==1.1
futures==3.0.3
jinja2==2.7.3
markupsafe==0.23 # via jinja2
mysql-python==1.2.5
redis==2.10.3
tornado==4.2
ujson==1.33

2.然后执行命令:

mkdir ./wheelhouse;
pip wheel -r requirements.txt -w ./wheelhouse/

在 ./wheelhouse 目录下生成了 .whl 格式的二进制依赖包.


3.把程序打包, 解压到机器上;


4.执行虚拟环境:

virtualenv .venv
source .venv/bin/activate

5.通过执行:

pip install --use-wheel --no-index --find-links=./wheelhouse/ -r requirements.txt

来安装二进制依赖文件.


6.启动程序即可.

aws volume 的查询、创建、挂载 和 卸载

做个集合,方面查阅,欢迎参考,哈哈哈。

 

查看  volume-id 

import boto.ec2

region = “ap-southeast-1”
aws_access_key_id = “”
aws_secret_access_key = “”

conn = boto.ec2.connect_to_region(region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
reservations = conn.get_all_instances()
for res in reservations:
    for instance in res.instances:
        if ‘Name’ in instance.tags:
           volume_list = []
           for i in instance.block_device_mapping:
               volume_dict = {}
               volume_dict[“device”] = i
               volume_dict[“volume_id”] = instance.block_device_mapping.get(i).volume_id
               volume_list.append(volume_dict)

           print “%s (%s) [%s]:\n %s” % (instance.tags[‘Name’], instance.id, instance.state, volume_list)
        else:
           volume_list = []
           for i in instance.block_device_mapping:
               volume_dict = {}
               volume_dict[“device”] = i
               volume_dict[“volume_id”] = instance.block_device_mapping.get(i).volume_id
               volume_list.append(volume_dict)

           print “%s [%s]:\n %s” % (instance.id, instance.state, volume_list)
        print

 

创建volume

import boto.ec2

region = “ap-southeast-1”
aws_access_key_id = “”
aws_secret_access_key = “”

conn = boto.ec2.connect_to_region(region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

size = “126”
zone = “ap-southeast-1b”

volume_info = conn.create_volume(size, zone)
volume_id = volume_info.id

time_init = 0
time_total = 20
time_interval = 1
while time_init < time_total:
    volume_info.update()
    status = volume_info.status
    if status == ‘available’:
        print “{0}, {1}”.format(volume_id, status)
        break
    else:
        time.sleep(time_interval)
        time_init += time_interval

 

把一个 volume-id 挂载到 一个实例上

import boto.ec2

region = “ap-southeast-1”
aws_access_key_id = “”
aws_secret_access_key = “”

volume_id = “”
instance_id = “”
device = “”

conn = boto.ec2.connect_to_region(region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
reservations = conn.get_all_instances()
curr_vol = conn.get_all_volumes([volume_id])[0]
#print curr_vol
#print curr_vol.zone
if curr_vol.status == ‘available’:
    conn.attach_volume(volume_id, instance_id, device)

 

把一个 volume-id 从一个实例上卸掉

import boto.ec2

region = “”
aws_access_key_id = “”
aws_secret_access_key = “”

instance_id = “”
volume_id = “”
device = “”

conn = boto.ec2.connect_to_region(region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
reservations = conn.get_all_instances()
curr_vol = conn.get_all_volumes([volume_id])[0]
#print curr_vol
#print curr_vol.zone
if curr_vol.status == ‘in-use’:
    conn.detach_volume(volume_id, instance_id, device)