深入理解Python之并发
概述
本文主要关注Python
中的并发库concurrent.feature
,其在Python3.2
引入
案例:用三种方式进行Web下载
为了高效的处理网络I/O
,我们需要并发。以下三个例子用于展示三个方式下载图片
串行下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40OP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split() # <2>
BASE_URL = 'http://flupy.org/data/flags' # <3>
DEST_DIR = 'downloads/' # <4>
def save_flag(img, filename): # <5>
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def get_flag(cc): # <6>
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = requests.get(url)
return resp.content
def show(text): # <7>
print(text, end=' ')
sys.stdout.flush()
def download_many(cc_list): # <8>
for cc in sorted(cc_list): # <9>
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return len(cc_list)
def main(download_many): # <10>
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))线程池下载
1 | MAX_WORKERS = 20 # <2> |
Future在哪里?
Future
是concurrent.futures
和asyncio
重要的组件,但作为这些库的使用者,我们通常看不到它
Python3.4
标准库有两个叫Future
的库concurrent.futures.Future
和asyncio.Future
。他们有相同的目的,表示一个延时的计算,其很像JavaScript
中的Promise
。
Futures
封装了等待操作,以便能够放入队列,查询状态,以及能够收到完成的结果。
通常,我们不应该创建Futures
,它们由并发框架创建并管理,。这很重要
客户端不应该改变Future
的状态,这是由框架完成的。
两种类型的Future
都有一个非阻塞done
方法,返回Boolean
值,告知调用者,回调是否和future
连接。客户端通常会使用add_done_callback()
要求future
通知结果。
result()
方法,返回回调结果,或者抛出异常。然而如果future
没有完成,两种Future
的行为有巨大的差异。concurrency.futures.Future
实例,当触发result
时会阻塞调用线程,直到结果完成,可以设置超时时间。而asyncio.Future
不支持超时时间,一般其会使用yield from
来获取future
的结果
- 异步下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24# <3>
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url) # <4>
image = yield from resp.read() # <5>
return image
def download_one(cc): # <6>
image = yield from get_flag(cc) # <7>
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc
def download_many(cc_list):
loop = asyncio.get_event_loop() # <8>
to_do = [download_one(cc) for cc in sorted(cc_list)] # <9>
wait_coro = asyncio.wait(to_do) # <10>
res, _ = loop.run_until_complete(wait_coro) # <11>
loop.close() # <12>
return len(res)
严格上来讲,上述并发程序并不能并行下载,concurrent.futures
被GIL
(全局解释器锁)限制
阻塞I/O和GIL(全局解释器锁)
CPython
解释器内部不是线程安全的,因此其有个全局解释器锁,同一时刻只允许一个线程运行。
通常,我们写Python
时无法越过GIL
,但内置方法或者使用C
语言能够释放GIL
。事实上,C
写的Python
库能够管理GIL
,放出操作系统线程,充分利用CPU
的核心,但这让CODE
的复杂度大大增加,因此大多数的库都不会这么做
然而,所有的标准库方法,只要执行阻塞I/O
操作,就会释放GIL
。这意味着,当一个线程正在等待结果,阻塞I/O
的函数释放了GIL
,另一个线程可以运行
使用concurrent.futures发起进程
使用ProcessPoolExecutor
可以绕过GIL
使用所有CPU
1 | def download_many(cc_list): |
ThreadPoolExecutor
和ProcessPoolExecutor
的区别在于前者初始化的时候需要指定线程数量,而后者是可选
使用Executor.map
1 | def display(*args): # <1> |
Executor.map
用起来很简单,但它有个功能看起来可能不是很有用,它返回结果的顺序和调用的顺序一致,如果不想要这样,可以使用executor.submit()
和as_completed()
函数
线程和协程
下面分别有使用线程和协程的例子
线程:
1 | def spin(msg, done): # <2> |
协程
1 | # <1> |
两者的主要区别:
asyncio.Task
功能与threading
完全相同Task
驱动协程,而Thread
触发回调- 不需要自己初始化
Task
对象,通过传入一个协程到asyncio.async()
或者loop.create_task()
中获取 - 拿到一个
Task
对象时,其他它已经准备要运行了,而线程必须显式的调用start
- 线程中,目标方法由线程调用,而协程中由
yield from
驱动的协程调用 - 没有接口能够从外部中断线程。
Task
可以调用cancel
,其会在协程内抛出CancelledError
supervisor
协程必须在主函数中使用loop.run_until_complete
执行Thread
可能需要锁来控制状态,而协程状态由内部控制