深入理解Python之并发

概述

本文主要关注Python中的并发库concurrent.feature,其在Python3.2引入

案例:用三种方式进行Web下载

为了高效的处理网络I/O,我们需要并发。以下三个例子用于展示三个方式下载图片

  1. 串行下载

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    OP20_CC = ('CN IN US ID BR PK NG BD RU JP '
    'MX PH VN ET EG DE IR TR CD FR').split() # <2>

    BASE_URL = 'http://flupy.org/data/flags' # <3>

    DEST_DIR = 'downloads/' # <4>


    def save_flag(img, filename): # <5>
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
    fp.write(img)


    def get_flag(cc): # <6>
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


    def show(text): # <7>
    print(text, end=' ')
    sys.stdout.flush()


    def download_many(cc_list): # <8>
    for cc in sorted(cc_list): # <9>
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')

    return len(cc_list)


    def main(download_many): # <10>
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
  2. 线程池下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
MAX_WORKERS = 20  # <2>


def download_one(cc): # <3>
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc


def download_many(cc_list):
workers = min(MAX_WORKERS, len(cc_list)) # <4>
with futures.ThreadPoolExecutor(workers) as executor: # <5>
res = executor.map(download_one, sorted(cc_list)) # <6>

return len(list(res)) # <7>

Future在哪里?

Futureconcurrent.futuresasyncio重要的组件,但作为这些库的使用者,我们通常看不到它

Python3.4标准库有两个叫Future的库concurrent.futures.Futureasyncio.Future。他们有相同的目的,表示一个延时的计算,其很像JavaScript中的Promise

Futures封装了等待操作,以便能够放入队列,查询状态,以及能够收到完成的结果。

通常,我们不应该创建Futures,它们由并发框架创建并管理,。这很重要

客户端不应该改变Future的状态,这是由框架完成的。

两种类型的Future都有一个非阻塞done方法,返回Boolean值,告知调用者,回调是否和future连接。客户端通常会使用add_done_callback()要求future通知结果。

result()方法,返回回调结果,或者抛出异常。然而如果future没有完成,两种Future的行为有巨大的差异。concurrency.futures.Future实例,当触发result时会阻塞调用线程,直到结果完成,可以设置超时时间。而asyncio.Future不支持超时时间,一般其会使用yield from来获取future的结果

  1. 异步下载
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @asyncio.coroutine  # <3>
    def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url) # <4>
    image = yield from resp.read() # <5>
    return image


    @asyncio.coroutine
    def download_one(cc): # <6>
    image = yield from get_flag(cc) # <7>
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


    def download_many(cc_list):
    loop = asyncio.get_event_loop() # <8>
    to_do = [download_one(cc) for cc in sorted(cc_list)] # <9>
    wait_coro = asyncio.wait(to_do) # <10>
    res, _ = loop.run_until_complete(wait_coro) # <11>
    loop.close() # <12>

    return len(res)

严格上来讲,上述并发程序并不能并行下载,concurrent.futuresGIL(全局解释器锁)限制

阻塞I/O和GIL(全局解释器锁)

CPython解释器内部不是线程安全的,因此其有个全局解释器锁,同一时刻只允许一个线程运行。

通常,我们写Python时无法越过GIL,但内置方法或者使用C语言能够释放GIL。事实上,C写的Python库能够管理GIL,放出操作系统线程,充分利用CPU的核心,但这让CODE的复杂度大大增加,因此大多数的库都不会这么做

然而,所有的标准库方法,只要执行阻塞I/O操作,就会释放GIL。这意味着,当一个线程正在等待结果,阻塞I/O的函数释放了GIL,另一个线程可以运行

使用concurrent.futures发起进程

使用ProcessPoolExecutor可以绕过GIL使用所有CPU

1
2
def download_many(cc_list):
with futures.ProcessPoolExecutor() as executor:

ThreadPoolExecutorProcessPoolExecutor的区别在于前者初始化的时候需要指定线程数量,而后者是可选

使用Executor.map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def display(*args):  # <1>
print(strftime('[%H:%M:%S]'), end=' ')
print(*args)


def loiter(n): # <2>
msg = '{}loiter({}): doing nothing for {}s...'
display(msg.format('\t'*n, n, n))
sleep(n)
msg = '{}loiter({}): done.'
display(msg.format('\t'*n, n))
return n * 10 # <3>


def main():
display('Script starting.')
executor = futures.ThreadPoolExecutor(max_workers=3) # <4>
results = executor.map(loiter, range(5)) # <5>
display('results:', results) # <6>.
display('Waiting for individual results:')
for i, result in enumerate(results): # <7>
display('result {}: {}'.format(i, result))

Executor.map用起来很简单,但它有个功能看起来可能不是很有用,它返回结果的顺序和调用的顺序一致,如果不想要这样,可以使用executor.submit()as_completed()函数

线程和协程

下面分别有使用线程和协程的例子

线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def spin(msg, done):  # <2>
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'): # <3>
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status)) # <4>
if done.wait(.1): # <5>
break
write(' ' * len(status) + '\x08' * len(status)) # <6>


def slow_function(): # <7>
# pretend waiting a long time for I/O
time.sleep(3) # <8>
return 42


def supervisor(): # <9>
done = threading.Event()
spinner = threading.Thread(target=spin,
args=('thinking!', done))
print('spinner object:', spinner) # <10>
spinner.start() # <11>
result = slow_function() # <12>
done.set() # <13>
spinner.join() # <14>
return result

协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@asyncio.coroutine  # <1>
def spin(msg): # <2>
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
yield from asyncio.sleep(.1) # <3>
except asyncio.CancelledError: # <4>
break
write(' ' * len(status) + '\x08' * len(status))


@asyncio.coroutine
def slow_function(): # <5>
# pretend waiting a long time for I/O
yield from asyncio.sleep(3) # <6>
return 42


@asyncio.coroutine
def supervisor(): # <7>
spinner = asyncio.async(spin('thinking!')) # <8>
print('spinner object:', spinner) # <9>
result = yield from slow_function() # <10>
spinner.cancel() # <11>
return result

两者的主要区别:

  • asyncio.Task功能与threading完全相同
  • Task驱动协程,而Thread触发回调
  • 不需要自己初始化Task对象,通过传入一个协程到asyncio.async()或者loop.create_task()中获取
  • 拿到一个Task对象时,其他它已经准备要运行了,而线程必须显式的调用start
  • 线程中,目标方法由线程调用,而协程中由yield from驱动的协程调用
  • 没有接口能够从外部中断线程。Task可以调用cancel,其会在协程内抛出CancelledError
  • supervisor协程必须在主函数中使用loop.run_until_complete执行
  • Thread可能需要锁来控制状态,而协程状态由内部控制