深入理解Python之协程

概述

Python中在生成器中使用yield,其产生数据,被next()的调用者接收。其也会挂起生成器的执行以便调用者已经准备好接收下一个值,调用者从生成器拉数据

协程coroutine语法就像生成器。但是协程的yield通常会出现在表达式右侧。如果不产生值,那么yield之后跟着None。协程可能会从调用者接收数据,调用者会使用.send(datum)来传数据给协程,通常,调用者会将数据压入协程

有可能没有数据经过yield。除了控制数据流之外,yield能够控制调度器使多任务协作。

本文将讲述

  • 生成器作为协程时的表现和状态
  • 使用装饰器自动准备协程
  • 调用者如何通过.close.throw来控制协程
  • 协程如何根据终端返回值
  • yield的新语法使用
  • 模拟协程管理并发

协程如何从生成器进化

协程的基础框架出现在Python2.5,从那时起,yield能够在表达式中使用。调用者能够使用.send()来发数据,该数据变成yield的值.这样子生成器变成协程了。

Python3.3中,协程有两个语法改变

  • 生成器能够返回值。在这之前会抛出SyntaxError
  • yield from

协程的基本行为

1
2
3
4
def simple_coroutine():
print('-> coroutine started')
x = yield
print('-> coroutine received:', x)

协程有四个状态。可以使用inspect.getgeneratorstate()来确定当前的状态

  • GEN_CREATED:等待开始执行
  • GEN_RUNNING: 当前被解释器执行
  • GEN_SUSPENDED: 当前被yield表达式挂起
  • GEN_CLOSED: 执行已经完成
1
2
my_coro = simple_coroutine()
my_coro.send(1729)

如果创建一个协程并立即发送一个非None的值,那么会抛出异常

1
2
3
4
5
6
def simple_coro2(a):
print('-> Started: a =', a)
b = yield a
print('-> Received: b =', b)
c = yield a + b
print('-> Received: c =', c)

这个方法调用时,分三步,最初初始化方法,创建协程,之后每执行一次next,执行到第一个yield表达式

协程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total / count

coro_avg = averager()
next(coro_avg)
print(coro_avg.send(10))
print(coro_avg.send(30))
print(coro_avg.send(5))

创建协程,启动next(coro_avg)协程,调用send修改yield右值,每次执行到yield时,会挂起协程,等待下次数据的到来

协程初始化装饰器

如果没有包装,那么我们每次使用协程都必须调用next(my_coro)。为了使协程用起来更方便.有时会使用装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def coroutine(func):
"""Decorator: primes 'func' by advancing to first 'yield'"""

@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen

return primer

@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total / count

中断协程和异常处理

协程内未处理的异常会传播给引起它的next或者send的调用者

中断协程的方法之一,发送一些标记告知协程退出。从Python2.5开始,生成器对象有两个方法允许客户端显式发送异常给协程–throwclose

throw: 使yield抛出异常,如果异常被生成器处理了,流程会进入下一个next。如果异常没被处理,传播给调用者

close: 抛出Generator Exit exception,如果生成器没有处理异常,不会报告任何错误给调用者.当收到GeneratorExit,生成器不能yield value,否则会抛出timeError异常,如果生成器抛出其他异常,会反馈给调用者

1
2
3
4
5
6
7
8
9
10
11
12
def demo_exc_handling(self):
print('-> coroutine started')
try:
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing')
else:
print('-> coroutine received: {!r}'.format(x))
finally:
print('-> coroutine ending')

协程中返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)

coro_avg = averager()
print(coro_avg.send(10))
print(coro_avg.send(30))
print(coro_avg.send(5))
try:
coro_avg.send(None)
except StopIteration as exc:
result = exc.value
print(result)

使用yield from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def genYield():
for c in 'AB':
yield c
for i in range(1, 3):
yield i

print(list(genYield()))

def genYieldFrom():
yield from 'AB'
yield from range(1, 3)

print(list(genYieldFrom()))

上面两个方法的结果一样。yield from类似其他语言的await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Result = namedtuple('Result', 'count average')


def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)


def grouper(results, key):
while True:
results[key] = yield from averager()


def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group)
for value in values:
group.send(value)
group.send(None)

report(results)


def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))

案例:协程模拟离散事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 180
SEARCH_DURATION = 5
TRIP_DURATION = 20
DEPARTURE_INTERVAL = 5

Event = collections.namedtuple('Event', 'time proc action')


# BEGIN TAXI_PROCESS
def taxi_process(ident, trips, start_time=0): # <1>
"""Yield to simulator issuing event at each state change"""
time = yield Event(start_time, ident, 'leave garage') # <2>
for i in range(trips): # <3>
time = yield Event(time, ident, 'pick up passenger') # <4>
time = yield Event(time, ident, 'drop off passenger') # <5>

yield Event(time, ident, 'going home') # <6>
# end of taxi process # <7>


# END TAXI_PROCESS


# BEGIN TAXI_SIMULATOR
class Simulator:
def __init__(self, procs_map):
self.events = queue.PriorityQueue()
self.procs = dict(procs_map)

def run(self, end_time): # <1>
"""Schedule and display events until time is up"""
# schedule the first event for each cab
for _, proc in sorted(self.procs.items()): # <2>
first_event = next(proc) # <3>
self.events.put(first_event) # <4>

# main loop of the simulation
sim_time = 0 # <5>
while sim_time < end_time: # <6>
if self.events.empty(): # <7>
print('*** end of events ***')
break

current_event = self.events.get() # <8>
sim_time, proc_id, previous_action = current_event # <9>
print('taxi:', proc_id, proc_id * ' ', current_event) # <10>
active_proc = self.procs[proc_id] # <11>
next_time = sim_time + compute_duration(previous_action) # <12>
try:
next_event = active_proc.send(next_time) # <13>
except StopIteration:
del self.procs[proc_id] # <14>
else:
self.events.put(next_event) # <15>
else: # <16>
msg = '*** end of simulation time: {} events pending ***'
print(msg.format(self.events.qsize()))


# END TAXI_SIMULATOR


def compute_duration(previous_action):
"""Compute action duration using exponential distribution"""
if previous_action in ['leave garage', 'drop off passenger']:
# new state is prowling
interval = SEARCH_DURATION
elif previous_action == 'pick up passenger':
# new state is trip
interval = TRIP_DURATION
elif previous_action == 'going home':
interval = 1
else:
raise ValueError('Unknown previous_action: %s' % previous_action)
return int(random.expovariate(1 / interval)) + 1


def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS,
seed=None):
"""Initialize random generator, build procs and run simulation"""
if seed is not None:
random.seed(seed) # get reproducible results

taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL)
for i in range(num_taxis)}
sim = Simulator(taxis)
sim.run(end_time)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Taxi fleet simulator.')
parser.add_argument('-e', '--end-time', type=int,
default=DEFAULT_END_TIME,
help='simulation end time; default = %s'
% DEFAULT_END_TIME)
parser.add_argument('-t', '--taxis', type=int,
default=DEFAULT_NUMBER_OF_TAXIS,
help='number of taxis running; default = %s'
% DEFAULT_NUMBER_OF_TAXIS)
parser.add_argument('-s', '--seed', type=int, default=None,
help='random generator seed (for testing)')

args = parser.parse_args()
main(args.end_time, args.taxis, args.seed)