0%

Android开发者都知道启动Activity是使用startActivity(),那么它是怎样一个过程呢?

启动流程

startActivity

两种方式,一种Activity自身方法,一种ContextImpl的方法

ContextImpl.startActivity
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void startActivity(Intent intent, Bundle options) {
warnIfCallingFromSystemProcess();

// Calling start activity from outside an activity without FLAG_ACTIVITY_NEW_TASK is
// generally not allowed, except if the caller specifies the task id the activity should
// be launched in.
if ((intent.getFlags()&Intent.FLAG_ACTIVITY_NEW_TASK) == 0
&& options != null && ActivityOptions.fromBundle(options).getLaunchTaskId() == -1) {
throw new AndroidRuntimeException(
"Calling startActivity() from outside of an Activity "
+ " context requires the FLAG_ACTIVITY_NEW_TASK flag."
+ " Is this really what you want?");
}
mMainThread.getInstrumentation().execStartActivity(
getOuterContext(), mMainThread.getApplicationThread(), null,
(Activity) null, intent, -1, options);
}

如果从Activity外部启动,并且没有设置FLAG_ACTIVITY_NEW_TASK,那么会发生crash
最终调用Instrumentation.execStartActivity

Activity.startActivity
1
2
3
4
5
6
7
8
9
public void startActivity(Intent intent, @Nullable Bundle options) {
if (options != null) {
startActivityForResult(intent, -1, options);
} else {
// Note we want to go through this call for compatibility with
// applications that may have overridden the method.
startActivityForResult(intent, -1);
}
}

从代码中能看出,最终还是调用的startActivityForResult

startActivityForResult

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void startActivityForResult(@RequiresPermission Intent intent, int requestCode,
@Nullable Bundle options) {
if (mParent == null) {
options = transferSpringboardActivityOptions(options);
Instrumentation.ActivityResult ar =
mInstrumentation.execStartActivity(
this, mMainThread.getApplicationThread(), mToken, this,
intent, requestCode, options);
if (ar != null) {
mMainThread.sendActivityResult(
mToken, mEmbeddedID, requestCode, ar.getResultCode(),
ar.getResultData());
}
if (requestCode >= 0) {
// If this start is requesting a result, we can avoid making
// the activity visible until the result is received. Setting
// this code during onCreate(Bundle savedInstanceState) or onResume() will keep the
// activity hidden during this time, to avoid flickering.
// This can only be done when a result is requested because
// that guarantees we will get information back when the
// activity is finished, no matter what happens to it.
mStartedActivity = true;
}

cancelInputsAndStartExitTransition(options);
// TODO Consider clearing/flushing other event sources and events for child windows.
} else {
if (options != null) {
mParent.startActivityFromChild(this, intent, requestCode, options);
} else {
// Note we want to go through this method for compatibility with
// existing applications that may have overridden it.
mParent.startActivityFromChild(this, intent, requestCode);
}
}
}
  • 首先包装options以便支持scene动画
  • Instrumentation执行execStartActiviy,并返回一个结果
  • 将结果传给ActivityThread,最后执行动画

Instrumentation.execStartActivity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public ActivityResult execStartActivity(
Context who, IBinder contextThread, IBinder token, Activity target,
Intent intent, int requestCode, Bundle options) {
IApplicationThread whoThread = (IApplicationThread) contextThread;
Uri referrer = target != null ? target.onProvideReferrer() : null;
if (referrer != null) {
intent.putExtra(Intent.EXTRA_REFERRER, referrer);
}
if (mActivityMonitors != null) {
synchronized (mSync) {
final int N = mActivityMonitors.size();
for (int i=0; i<N; i++) {
final ActivityMonitor am = mActivityMonitors.get(i);
if (am.match(who, null, intent)) {
am.mHits++;
if (am.isBlocking()) {
return requestCode >= 0 ? am.getResult() : null;
}
break;
}
}
}
}
try {
intent.migrateExtraStreamToClipData();
intent.prepareToLeaveProcess(who);
int result = ActivityManagerNative.getDefault()
.startActivity(whoThread, who.getBasePackageName(), intent,
intent.resolveTypeIfNeeded(who.getContentResolver()),
token, target != null ? target.mEmbeddedID : null,
requestCode, 0, null, options);
checkStartActivityResult(result, intent);
} catch (RemoteException e) {
throw new RuntimeException("Failure from system", e);
}
return null;
}
  • 首先获取ApplicationThread其继承自Binder
  • 如果加入了Activity监控集,会去比对当前的Intent是否已经存在,如果该Activiy正在已经启动,那么本次直接返回
  • 然后交给ActivityManagerNative启动
  • 最后检查返回结果

ActivityManagerNative.startActivity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public int startActivity(IApplicationThread caller, String callingPackage, Intent intent,
String resolvedType, IBinder resultTo, String resultWho, int requestCode,
int startFlags, ProfilerInfo profilerInfo, Bundle options) throws RemoteException {
Parcel data = Parcel.obtain();
Parcel reply = Parcel.obtain();
data.writeInterfaceToken(IActivityManager.descriptor);
data.writeStrongBinder(caller != null ? caller.asBinder() : null);
data.writeString(callingPackage);
intent.writeToParcel(data, 0);
data.writeString(resolvedType);
data.writeStrongBinder(resultTo);
data.writeString(resultWho);
data.writeInt(requestCode);
data.writeInt(startFlags);
if (profilerInfo != null) {
data.writeInt(1);
profilerInfo.writeToParcel(data, Parcelable.PARCELABLE_WRITE_RETURN_VALUE);
} else {
data.writeInt(0);
}
if (options != null) {
data.writeInt(1);
options.writeToParcel(data, 0);
} else {
data.writeInt(0);
}
mRemote.transact(START_ACTIVITY_TRANSACTION, data, reply, 0);
reply.readException();
int result = reply.readInt();
reply.recycle();
data.recycle();
return result;
}

获取数据,由ActivityManagerProxy发出START_ACTIVITY_TRANSACTION命令。经过IPC,进入到ActivityManagerNative

ActivityManagerNative.onTransact

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public boolean onTransact(int code, Parcel data, Parcel reply, int flags)
throws RemoteException {
switch (code) {
case START_ACTIVITY_TRANSACTION:
{
data.enforceInterface(IActivityManager.descriptor);
IBinder b = data.readStrongBinder();
IApplicationThread app = ApplicationThreadNative.asInterface(b);
String callingPackage = data.readString();
Intent intent = Intent.CREATOR.createFromParcel(data);
String resolvedType = data.readString();
IBinder resultTo = data.readStrongBinder();
String resultWho = data.readString();
int requestCode = data.readInt();
int startFlags = data.readInt();
ProfilerInfo profilerInfo = data.readInt() != 0
? ProfilerInfo.CREATOR.createFromParcel(data) : null;
Bundle options = data.readInt() != 0
? Bundle.CREATOR.createFromParcel(data) : null;
int result = startActivity(app, callingPackage, intent, resolvedType,
resultTo, resultWho, requestCode, startFlags, profilerInfo, options);
reply.writeNoException();
reply.writeInt(result);
return true;
}
...
}

转换数据,调用ActivityManagerServicestartActivity

ActivityManagerService

1
2
3
4
5
6
7
8
9
10
11
public final int startActivityAsUser(IApplicationThread caller, String callingPackage,
Intent intent, String resolvedType, IBinder resultTo, String resultWho, int requestCode,
int startFlags, ProfilerInfo profilerInfo, Bundle bOptions, int userId) {
enforceNotIsolatedCaller("startActivity");
userId = mUserController.handleIncomingUser(Binder.getCallingPid(), Binder.getCallingUid(),
userId, false, ALLOW_FULL_ONLY, "startActivity", null);
// TODO: Switch to user app stacks here.
return mActivityStarter.startActivityMayWait(caller, -1, callingPackage, intent,
resolvedType, null, null, resultTo, resultWho, requestCode, startFlags,
profilerInfo, null, null, bOptions, false, userId, null, null);
}
  • 计算用户id,如果是多用户,那么进程id/每个用户范围(100000),否则直接认为系统是用户

ActivityStarter.startActivityMayWait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
final int startActivityMayWait(IApplicationThread caller, int callingUid,
String callingPackage, Intent intent, String resolvedType,
IVoiceInteractionSession voiceSession, IVoiceInteractor voiceInteractor,
IBinder resultTo, String resultWho, int requestCode, int startFlags,
ProfilerInfo profilerInfo, IActivityManager.WaitResult outResult, Configuration config,
Bundle bOptions, boolean ignoreTargetSecurity, int userId,
IActivityContainer iContainer, TaskRecord inTask) {
...
//解析目标Acitvity
ctivityInfo aInfo = mSupervisor.resolveActivity(intent, rInfo, startFlags, profilerInfo);

ActivityStackSupervisor.ActivityContainer container =
(ActivityStackSupervisor.ActivityContainer)iContainer;
synchronized (mService) {
if (container != null && container.mParentActivity != null &&
container.mParentActivity.state != RESUMED) {
// Cannot start a child activity if the parent is not resumed.
return ActivityManager.START_CANCELED;
}
final int realCallingPid = Binder.getCallingPid();
final int realCallingUid = Binder.getCallingUid();
int callingPid;
if (callingUid >= 0) {
callingPid = -1;
} else if (caller == null) {
callingPid = realCallingPid;
callingUid = realCallingUid;
} else {
callingPid = callingUid = -1;
}

final ActivityStack stack;
if (container == null || container.mStack.isOnHomeDisplay()) {
stack = mSupervisor.mFocusedStack;
} else {
stack = container.mStack;
}
stack.mConfigWillChange = config != null && mService.mConfiguration.diff(config) != 0;
if (DEBUG_CONFIGURATION) Slog.v(TAG_CONFIGURATION,
"Starting activity when config will change = " + stack.mConfigWillChange);

final long origId = Binder.clearCallingIdentity();

if (aInfo != null &&
(aInfo.applicationInfo.privateFlags
& ApplicationInfo.PRIVATE_FLAG_CANT_SAVE_STATE) != 0) {
...
//带隐私分支
}

final ActivityRecord[] outRecord = new ActivityRecord[1];
int res = startActivityLocked(caller, intent, ephemeralIntent, resolvedType,
aInfo, rInfo, voiceSession, voiceInteractor,
resultTo, resultWho, requestCode, callingPid,
callingUid, callingPackage, realCallingPid, realCallingUid, startFlags,
options, ignoreTargetSecurity, componentSpecified, outRecord, container,
inTask);
...
if (outResult != null) {
outResult.result = res;
//开始成功,进行等待,进入下一个状态
if (res == ActivityManager.START_SUCCESS) {
mSupervisor.mWaitingActivityLaunched.add(outResult);
do {
try {
mService.wait();
} catch (InterruptedException e) {
}
} while (outResult.result != START_TASK_TO_FRONT
&& !outResult.timeout && outResult.who == null);
if (outResult.result == START_TASK_TO_FRONT) {
res = START_TASK_TO_FRONT;
}
}
if (res == START_TASK_TO_FRONT) {
ActivityRecord r = stack.topRunningActivityLocked();
if (r.nowVisible && r.state == RESUMED) {
outResult.timeout = false;
outResult.who = new ComponentName(r.info.packageName, r.info.name);
outResult.totalTime = 0;
outResult.thisTime = 0;
} else {
outResult.thisTime = SystemClock.uptimeMillis();
mSupervisor.mWaitingActivityVisible.add(outResult);
do {
try {
mService.wait();
} catch (InterruptedException e) {
}
} while (!outResult.timeout && outResult.who == null);
}
}
}
}

  • 解析目标Actiivty
  • 获取ActivityStackSupervisor.ActivityContainer,如果父activity没有处在RESUME,那么不能启动子activity
  • 设置进程id和用户id
  • 设置ActivityStack,如果没有父activity,设置ActivityStackSupervisor
  • 如果带有隐私标记,那么处理隐私情况
  • 调用startActivityLocked并返回结果,如果启动成功,那么等待。直到满足条件

Activity.startActivityLocked

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
final int startActivityLocked(IApplicationThread caller, Intent intent, Intent ephemeralIntent,
String resolvedType, ActivityInfo aInfo, ResolveInfo rInfo,
IVoiceInteractionSession voiceSession, IVoiceInteractor voiceInteractor,
IBinder resultTo, String resultWho, int requestCode, int callingPid, int callingUid,
String callingPackage, int realCallingPid, int realCallingUid, int startFlags,
ActivityOptions options, boolean ignoreTargetSecurity, boolean componentSpecified,
ActivityRecord[] outActivity, ActivityStackSupervisor.ActivityContainer container,
TaskRecord inTask) {
...
//检查lanchFlags
final int launchFlags = intent.getFlags();

if ((launchFlags & Intent.FLAG_ACTIVITY_FORWARD_RESULT) != 0 && sourceRecord != null) {
// Transfer the result target from the source activity to the new
// one being started, including any failures.
if (requestCode >= 0) {
ActivityOptions.abort(options);
return ActivityManager.START_FORWARD_AND_REQUEST_CONFLICT;
}
resultRecord = sourceRecord.resultTo;
if (resultRecord != null && !resultRecord.isInStackLocked()) {
resultRecord = null;
}
resultWho = sourceRecord.resultWho;
requestCode = sourceRecord.requestCode;
sourceRecord.resultTo = null;
if (resultRecord != null) {
resultRecord.removeResultsLocked(sourceRecord, resultWho, requestCode);
}
if (sourceRecord.launchedFromUid == callingUid) {
// The new activity is being launched from the same uid as the previous
// activity in the flow, and asking to forward its result back to the
// previous. In this case the activity is serving as a trampoline between
// the two, so we also want to update its launchedFromPackage to be the
// same as the previous activity. Note that this is safe, since we know
// these two packages come from the same uid; the caller could just as
// well have supplied that same package name itself. This specifially
// deals with the case of an intent picker/chooser being launched in the app
// flow to redirect to an activity picked by the user, where we want the final
// activity to consider it to have been launched by the previous app activity.
callingPackage = sourceRecord.launchedFromPackage;
}
}

if (err == ActivityManager.START_SUCCESS && intent.getComponent() == null) {
// We couldn't find a class that can handle the given Intent.
// That's the end of that!
err = ActivityManager.START_INTENT_NOT_RESOLVED;
}

if (err == ActivityManager.START_SUCCESS && aInfo == null) {
// We couldn't find the specific class specified in the Intent.
// Also the end of the line.
err = ActivityManager.START_CLASS_NOT_FOUND;
}

if (err == ActivityManager.START_SUCCESS && sourceRecord != null
&& sourceRecord.task.voiceSession != null) {
// If this activity is being launched as part of a voice session, we need
// to ensure that it is safe to do so. If the upcoming activity will also
// be part of the voice session, we can only launch it if it has explicitly
// said it supports the VOICE category, or it is a part of the calling app.
if ((launchFlags & FLAG_ACTIVITY_NEW_TASK) == 0
&& sourceRecord.info.applicationInfo.uid != aInfo.applicationInfo.uid) {
try {
intent.addCategory(Intent.CATEGORY_VOICE);
if (!AppGlobals.getPackageManager().activitySupportsIntent(
intent.getComponent(), intent, resolvedType)) {
Slog.w(TAG,
"Activity being started in current voice task does not support voice: "
+ intent);
err = ActivityManager.START_NOT_VOICE_COMPATIBLE;
}
} catch (RemoteException e) {
Slog.w(TAG, "Failure checking voice capabilities", e);
err = ActivityManager.START_NOT_VOICE_COMPATIBLE;
}
}
}

...
//创建ActivityRecord
ActivityRecord r = new ActivityRecord(mService, callerApp, callingUid, callingPackage,
intent, resolvedType, aInfo, mService.mConfiguration, resultRecord, resultWho,
requestCode, componentSpecified, voiceSession != null, mSupervisor, container,
options, sourceRecord);
...
err = startActivityUnchecked(r, sourceRecord, voiceSession, voiceInteractor, startFlags,
true, options, inTask);
...
}

  • 检查launchFlags
  • 创建ActivityRecord

ActivityStarter.startActivityUnchecked

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 private int startActivityUnchecked(final ActivityRecord r, ActivityRecord sourceRecord,
IVoiceInteractionSession voiceSession, IVoiceInteractor voiceInteractor,
int startFlags, boolean doResume, ActivityOptions options, TaskRecord inTask) {

setInitialState(r, options, inTask, doResume, startFlags, sourceRecord, voiceSession,
voiceInteractor);
//计算flags
computeLaunchingTaskFlags();
...
//获取可复用的Intent
mReusedActivity = getReusableIntentActivity();
//使activity可见
mTargetStack.ensureActivitiesVisibleLocked(null, 0, !PRESERVE_WINDOWS);
}
  • 计算launchflags
  • 获取可复用的activity
  • 做了很多复杂的事情,这里不赘述

ActivityStack.ensureActivitiesVisibleLocked

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final void ensureActivitiesVisibleLocked(ActivityRecord starting, int configChanges,
boolean preserveWindows) {
...
if (r.app == null || r.app.thread == null) {
if (makeVisibleAndRestartIfNeeded(starting, configChanges, isTop,
resumeNextActivity, r)) {
if (activityNdx >= activities.size()) {
// Record may be removed if its process needs to restart.
activityNdx = activities.size() - 1;
} else {
resumeNextActivity = false;
}
}
}
...
}
  • 如果ProcessRecord为空,或者ApplicationThread为空,那么尝试重启Acitvity

ActivityStack.makeVisibleAndRestartIfNeeded

1
2
3
4
5
6
7
private boolean makeVisibleAndRestartIfNeeded(ActivityRecord starting, int configChanges,
boolean isTop, boolean andResume, ActivityRecord r) {
if (r != starting) {
mStackSupervisor.startSpecificActivityLocked(r, andResume, false);
return true;
}
}
  • 如果不是正在启动,则启动特定的Activity

ActivityStackSupervisor.startSpecificActivityLocked

1
2
3
4
5
6
void startSpecificActivityLocked(ActivityRecord r,
boolean andResume, boolean checkConfig) {
...
realStartActivityLocked(r, app, andResume, checkConfig);
...
}

ActivityStackSupervisor.realStartActivityLocked

1
2
3
4
5
6
7
8
9
10
final boolean realStartActivityLocked(ActivityRecord r, ProcessRecord app,
boolean andResume, boolean checkConfig){
...
app.thread.scheduleLaunchActivity(new Intent(r.intent), r.appToken,
System.identityHashCode(r), r.info, new Configuration(mService.mConfiguration),
new Configuration(task.mOverrideConfig), r.compat, r.launchedFromPackage,
task.voiceInteractor, app.repProcState, r.icicle, r.persistentState, results,
newIntents, !andResume, mService.isNextTransitionForward(), profilerInfo);
}

  • 调用ProcessRecord中的ApplicationThread来启动Actiivty

ApplicationThreadProxy.scheduleLaunchActivity

1
2
3
4
5
6
7
8
public final void scheduleLaunchActivity(Intent intent, IBinder token, int ident,
ActivityInfo info, Configuration curConfig, Configuration overrideConfig,
CompatibilityInfo compatInfo, String referrer, IVoiceInteractor voiceInteractor,
int procState, Bundle state, PersistableBundle persistentState,
List<ResultInfo> pendingResults, List<ReferrerIntent> pendingNewIntents,
boolean notResumed, boolean isForward, ProfilerInfo profilerInfo) {

}
  • 通过IPC获取调用scheduleLaunchActivity

ApplicationThreadNative

1
2
3
4
5
6
7
8
9
public final void scheduleLaunchActivity(Intent intent, IBinder token, int ident,
ActivityInfo info, Configuration curConfig, Configuration overrideConfig,
CompatibilityInfo compatInfo, String referrer, IVoiceInteractor voiceInteractor,
int procState, Bundle state, PersistableBundle persistentState,
List<ResultInfo> pendingResults, List<ReferrerIntent> pendingNewIntents,
boolean notResumed, boolean isForward, ProfilerInfo profilerInfo) {
...
sendMessage(H.LAUNCH_ACTIVITY, r);
}
  • 想H Handler主线程发送启动命令

ActivityThread.performLaunchActivity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Activity performLaunchActivity(ActivityClientRecord r, Intent customIntent) {
//通过反射创建activity
...
Activity activity = null;
try {
java.lang.ClassLoader cl = r.packageInfo.getClassLoader();
activity = mInstrumentation.newActivity(
cl, component.getClassName(), r.intent);
StrictMode.incrementExpectedActivityCount(activity.getClass());
r.intent.setExtrasClassLoader(cl);
r.intent.prepareToEnterProcess();
if (r.state != null) {
r.state.setClassLoader(cl);
}
} catch (Exception e) {
if (!mInstrumentation.onException(activity, e)) {
throw new RuntimeException(
"Unable to instantiate activity " + component
+ ": " + e.toString(), e);
}
}
}
  • 创建Activity
  • 之后通过Instrumentation开始执行生命周期的方法
  • 至此Acitivity的启动起来了。整个过程相当复杂。为了方便理解,中间省略了很多细节

对于onNewIntent的处理

onNewIntentActivity的一个回调方法,那么在什么情况下会调用呢?在ActivityStack中的navigateUpToLocked中的源码是这样的

1
2
3
4
5
6
7
8
9
10
11
12
if (parent != null && foundParentInTask) {
final int parentLaunchMode = parent.info.launchMode;
final int destIntentFlags = destIntent.getFlags();
if (parentLaunchMode == ActivityInfo.LAUNCH_SINGLE_INSTANCE ||
parentLaunchMode == ActivityInfo.LAUNCH_SINGLE_TASK ||
parentLaunchMode == ActivityInfo.LAUNCH_SINGLE_TOP ||
(destIntentFlags & Intent.FLAG_ACTIVITY_CLEAR_TOP) != 0) {
parent.deliverNewIntentLocked(srec.info.applicationInfo.uid, destIntent,
srec.packageName);
}
...
}
  • 可以看出条件是
    • 当需要启动的Activity已存在,并且launchModeSINGLE_TOP(栈顶是相同的Activity)或者SINGLE_INSTANCE或者SINGLE_TASK

Activity启动时序图

activity_flow

概述

本文主要关注Python中的并发库concurrent.feature,其在Python3.2引入

案例:用三种方式进行Web下载

为了高效的处理网络I/O,我们需要并发。以下三个例子用于展示三个方式下载图片

  1. 串行下载

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    OP20_CC = ('CN IN US ID BR PK NG BD RU JP '
    'MX PH VN ET EG DE IR TR CD FR').split() # <2>

    BASE_URL = 'http://flupy.org/data/flags' # <3>

    DEST_DIR = 'downloads/' # <4>


    def save_flag(img, filename): # <5>
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
    fp.write(img)


    def get_flag(cc): # <6>
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


    def show(text): # <7>
    print(text, end=' ')
    sys.stdout.flush()


    def download_many(cc_list): # <8>
    for cc in sorted(cc_list): # <9>
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')

    return len(cc_list)


    def main(download_many): # <10>
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
  2. 线程池下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
MAX_WORKERS = 20  # <2>


def download_one(cc): # <3>
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc


def download_many(cc_list):
workers = min(MAX_WORKERS, len(cc_list)) # <4>
with futures.ThreadPoolExecutor(workers) as executor: # <5>
res = executor.map(download_one, sorted(cc_list)) # <6>

return len(list(res)) # <7>

Future在哪里?

Futureconcurrent.futuresasyncio重要的组件,但作为这些库的使用者,我们通常看不到它

Python3.4标准库有两个叫Future的库concurrent.futures.Futureasyncio.Future。他们有相同的目的,表示一个延时的计算,其很像JavaScript中的Promise

Futures封装了等待操作,以便能够放入队列,查询状态,以及能够收到完成的结果。

通常,我们不应该创建Futures,它们由并发框架创建并管理,。这很重要

客户端不应该改变Future的状态,这是由框架完成的。

两种类型的Future都有一个非阻塞done方法,返回Boolean值,告知调用者,回调是否和future连接。客户端通常会使用add_done_callback()要求future通知结果。

result()方法,返回回调结果,或者抛出异常。然而如果future没有完成,两种Future的行为有巨大的差异。concurrency.futures.Future实例,当触发result时会阻塞调用线程,直到结果完成,可以设置超时时间。而asyncio.Future不支持超时时间,一般其会使用yield from来获取future的结果

  1. 异步下载
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @asyncio.coroutine  # <3>
    def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url) # <4>
    image = yield from resp.read() # <5>
    return image


    @asyncio.coroutine
    def download_one(cc): # <6>
    image = yield from get_flag(cc) # <7>
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


    def download_many(cc_list):
    loop = asyncio.get_event_loop() # <8>
    to_do = [download_one(cc) for cc in sorted(cc_list)] # <9>
    wait_coro = asyncio.wait(to_do) # <10>
    res, _ = loop.run_until_complete(wait_coro) # <11>
    loop.close() # <12>

    return len(res)

严格上来讲,上述并发程序并不能并行下载,concurrent.futuresGIL(全局解释器锁)限制

阻塞I/O和GIL(全局解释器锁)

CPython解释器内部不是线程安全的,因此其有个全局解释器锁,同一时刻只允许一个线程运行。

通常,我们写Python时无法越过GIL,但内置方法或者使用C语言能够释放GIL。事实上,C写的Python库能够管理GIL,放出操作系统线程,充分利用CPU的核心,但这让CODE的复杂度大大增加,因此大多数的库都不会这么做

然而,所有的标准库方法,只要执行阻塞I/O操作,就会释放GIL。这意味着,当一个线程正在等待结果,阻塞I/O的函数释放了GIL,另一个线程可以运行

使用concurrent.futures发起进程

使用ProcessPoolExecutor可以绕过GIL使用所有CPU

1
2
def download_many(cc_list):
with futures.ProcessPoolExecutor() as executor:

ThreadPoolExecutorProcessPoolExecutor的区别在于前者初始化的时候需要指定线程数量,而后者是可选

使用Executor.map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def display(*args):  # <1>
print(strftime('[%H:%M:%S]'), end=' ')
print(*args)


def loiter(n): # <2>
msg = '{}loiter({}): doing nothing for {}s...'
display(msg.format('\t'*n, n, n))
sleep(n)
msg = '{}loiter({}): done.'
display(msg.format('\t'*n, n))
return n * 10 # <3>


def main():
display('Script starting.')
executor = futures.ThreadPoolExecutor(max_workers=3) # <4>
results = executor.map(loiter, range(5)) # <5>
display('results:', results) # <6>.
display('Waiting for individual results:')
for i, result in enumerate(results): # <7>
display('result {}: {}'.format(i, result))

Executor.map用起来很简单,但它有个功能看起来可能不是很有用,它返回结果的顺序和调用的顺序一致,如果不想要这样,可以使用executor.submit()as_completed()函数

线程和协程

下面分别有使用线程和协程的例子

线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def spin(msg, done):  # <2>
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'): # <3>
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status)) # <4>
if done.wait(.1): # <5>
break
write(' ' * len(status) + '\x08' * len(status)) # <6>


def slow_function(): # <7>
# pretend waiting a long time for I/O
time.sleep(3) # <8>
return 42


def supervisor(): # <9>
done = threading.Event()
spinner = threading.Thread(target=spin,
args=('thinking!', done))
print('spinner object:', spinner) # <10>
spinner.start() # <11>
result = slow_function() # <12>
done.set() # <13>
spinner.join() # <14>
return result

协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@asyncio.coroutine  # <1>
def spin(msg): # <2>
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
yield from asyncio.sleep(.1) # <3>
except asyncio.CancelledError: # <4>
break
write(' ' * len(status) + '\x08' * len(status))


@asyncio.coroutine
def slow_function(): # <5>
# pretend waiting a long time for I/O
yield from asyncio.sleep(3) # <6>
return 42


@asyncio.coroutine
def supervisor(): # <7>
spinner = asyncio.async(spin('thinking!')) # <8>
print('spinner object:', spinner) # <9>
result = yield from slow_function() # <10>
spinner.cancel() # <11>
return result

两者的主要区别:

  • asyncio.Task功能与threading完全相同
  • Task驱动协程,而Thread触发回调
  • 不需要自己初始化Task对象,通过传入一个协程到asyncio.async()或者loop.create_task()中获取
  • 拿到一个Task对象时,其他它已经准备要运行了,而线程必须显式的调用start
  • 线程中,目标方法由线程调用,而协程中由yield from驱动的协程调用
  • 没有接口能够从外部中断线程。Task可以调用cancel,其会在协程内抛出CancelledError
  • supervisor协程必须在主函数中使用loop.run_until_complete执行
  • Thread可能需要锁来控制状态,而协程状态由内部控制

概述

Python中在生成器中使用yield,其产生数据,被next()的调用者接收。其也会挂起生成器的执行以便调用者已经准备好接收下一个值,调用者从生成器拉数据

协程coroutine语法就像生成器。但是协程的yield通常会出现在表达式右侧。如果不产生值,那么yield之后跟着None。协程可能会从调用者接收数据,调用者会使用.send(datum)来传数据给协程,通常,调用者会将数据压入协程

有可能没有数据经过yield。除了控制数据流之外,yield能够控制调度器使多任务协作。

本文将讲述

  • 生成器作为协程时的表现和状态
  • 使用装饰器自动准备协程
  • 调用者如何通过.close.throw来控制协程
  • 协程如何根据终端返回值
  • yield的新语法使用
  • 模拟协程管理并发

协程如何从生成器进化

协程的基础框架出现在Python2.5,从那时起,yield能够在表达式中使用。调用者能够使用.send()来发数据,该数据变成yield的值.这样子生成器变成协程了。

Python3.3中,协程有两个语法改变

  • 生成器能够返回值。在这之前会抛出SyntaxError
  • yield from

协程的基本行为

1
2
3
4
def simple_coroutine():
print('-> coroutine started')
x = yield
print('-> coroutine received:', x)

协程有四个状态。可以使用inspect.getgeneratorstate()来确定当前的状态

  • GEN_CREATED:等待开始执行
  • GEN_RUNNING: 当前被解释器执行
  • GEN_SUSPENDED: 当前被yield表达式挂起
  • GEN_CLOSED: 执行已经完成
1
2
my_coro = simple_coroutine()
my_coro.send(1729)

如果创建一个协程并立即发送一个非None的值,那么会抛出异常

1
2
3
4
5
6
def simple_coro2(a):
print('-> Started: a =', a)
b = yield a
print('-> Received: b =', b)
c = yield a + b
print('-> Received: c =', c)

这个方法调用时,分三步,最初初始化方法,创建协程,之后每执行一次next,执行到第一个yield表达式

协程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total / count

coro_avg = averager()
next(coro_avg)
print(coro_avg.send(10))
print(coro_avg.send(30))
print(coro_avg.send(5))

创建协程,启动next(coro_avg)协程,调用send修改yield右值,每次执行到yield时,会挂起协程,等待下次数据的到来

协程初始化装饰器

如果没有包装,那么我们每次使用协程都必须调用next(my_coro)。为了使协程用起来更方便.有时会使用装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def coroutine(func):
"""Decorator: primes 'func' by advancing to first 'yield'"""

@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen

return primer

@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total / count

中断协程和异常处理

协程内未处理的异常会传播给引起它的next或者send的调用者

中断协程的方法之一,发送一些标记告知协程退出。从Python2.5开始,生成器对象有两个方法允许客户端显式发送异常给协程–throwclose

throw: 使yield抛出异常,如果异常被生成器处理了,流程会进入下一个next。如果异常没被处理,传播给调用者

close: 抛出Generator Exit exception,如果生成器没有处理异常,不会报告任何错误给调用者.当收到GeneratorExit,生成器不能yield value,否则会抛出timeError异常,如果生成器抛出其他异常,会反馈给调用者

1
2
3
4
5
6
7
8
9
10
11
12
def demo_exc_handling(self):
print('-> coroutine started')
try:
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing')
else:
print('-> coroutine received: {!r}'.format(x))
finally:
print('-> coroutine ending')

协程中返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)

coro_avg = averager()
print(coro_avg.send(10))
print(coro_avg.send(30))
print(coro_avg.send(5))
try:
coro_avg.send(None)
except StopIteration as exc:
result = exc.value
print(result)

使用yield from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def genYield():
for c in 'AB':
yield c
for i in range(1, 3):
yield i

print(list(genYield()))

def genYieldFrom():
yield from 'AB'
yield from range(1, 3)

print(list(genYieldFrom()))

上面两个方法的结果一样。yield from类似其他语言的await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Result = namedtuple('Result', 'count average')


def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)


def grouper(results, key):
while True:
results[key] = yield from averager()


def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group)
for value in values:
group.send(value)
group.send(None)

report(results)


def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))

案例:协程模拟离散事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 180
SEARCH_DURATION = 5
TRIP_DURATION = 20
DEPARTURE_INTERVAL = 5

Event = collections.namedtuple('Event', 'time proc action')


# BEGIN TAXI_PROCESS
def taxi_process(ident, trips, start_time=0): # <1>
"""Yield to simulator issuing event at each state change"""
time = yield Event(start_time, ident, 'leave garage') # <2>
for i in range(trips): # <3>
time = yield Event(time, ident, 'pick up passenger') # <4>
time = yield Event(time, ident, 'drop off passenger') # <5>

yield Event(time, ident, 'going home') # <6>
# end of taxi process # <7>


# END TAXI_PROCESS


# BEGIN TAXI_SIMULATOR
class Simulator:
def __init__(self, procs_map):
self.events = queue.PriorityQueue()
self.procs = dict(procs_map)

def run(self, end_time): # <1>
"""Schedule and display events until time is up"""
# schedule the first event for each cab
for _, proc in sorted(self.procs.items()): # <2>
first_event = next(proc) # <3>
self.events.put(first_event) # <4>

# main loop of the simulation
sim_time = 0 # <5>
while sim_time < end_time: # <6>
if self.events.empty(): # <7>
print('*** end of events ***')
break

current_event = self.events.get() # <8>
sim_time, proc_id, previous_action = current_event # <9>
print('taxi:', proc_id, proc_id * ' ', current_event) # <10>
active_proc = self.procs[proc_id] # <11>
next_time = sim_time + compute_duration(previous_action) # <12>
try:
next_event = active_proc.send(next_time) # <13>
except StopIteration:
del self.procs[proc_id] # <14>
else:
self.events.put(next_event) # <15>
else: # <16>
msg = '*** end of simulation time: {} events pending ***'
print(msg.format(self.events.qsize()))


# END TAXI_SIMULATOR


def compute_duration(previous_action):
"""Compute action duration using exponential distribution"""
if previous_action in ['leave garage', 'drop off passenger']:
# new state is prowling
interval = SEARCH_DURATION
elif previous_action == 'pick up passenger':
# new state is trip
interval = TRIP_DURATION
elif previous_action == 'going home':
interval = 1
else:
raise ValueError('Unknown previous_action: %s' % previous_action)
return int(random.expovariate(1 / interval)) + 1


def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS,
seed=None):
"""Initialize random generator, build procs and run simulation"""
if seed is not None:
random.seed(seed) # get reproducible results

taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL)
for i in range(num_taxis)}
sim = Simulator(taxis)
sim.run(end_time)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Taxi fleet simulator.')
parser.add_argument('-e', '--end-time', type=int,
default=DEFAULT_END_TIME,
help='simulation end time; default = %s'
% DEFAULT_END_TIME)
parser.add_argument('-t', '--taxis', type=int,
default=DEFAULT_NUMBER_OF_TAXIS,
help='number of taxis running; default = %s'
% DEFAULT_NUMBER_OF_TAXIS)
parser.add_argument('-s', '--seed', type=int, default=None,
help='random generator seed (for testing)')

args = parser.parse_args()
main(args.end_time, args.taxis, args.seed)

### 概述
本文主要讨论Python中的控制流特性

  • withcontext manager
  • elsefor,while,try中的使用

with表达式创建临时可靠的上下文。这能防止错误和减少模板代码,使API更便于使用和更安全。经常能够发现使用with表达式能够实现文件的自动关闭。

先做这个,然后做那个:if之后是else块

  • for: 循环执行完并且没有执行if
    1
    2
    3
    4
    5
    for item in my_list:
    if item.flavor == 'banana':
    break
    else:
    raise ValueError(‘No banana flavor found!')
  • while: 当循环条件变成false,循环退出执行
  • try: 当try块中没有抛出异常执行
    1
    2
    3
    4
    5
    6
    7
    try:
    dagerour_call()
    after_call()
    except OSError:
    log('OSError')
    else:
    after_call()

Context Manager和with

上下文管理器对象用于管理with表达式,就像迭代器用于管理for表达式

with表达式被设计用来简化try/finally模式,保证某些操作最后一定能够执行。

上下文管理器协议包含__enter____exit__方法.with开始,__enter__触发,而__exit__扮演了finally的角色,离开with块执行

1
2
with open('mirror.py') as fp:
src = fp.read(60)

上述代码打开文件,结束之后会自动关闭文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class LookingGlass:
def __enter__(self):
import sys
self.original_write = sys.stdout.write
sys.stdout.write = self.reverse_write
return 'JABBERWOCKY'

def reverse_write(self, text):
self.original_write(text[::-1])

def __exit__(self, exc_type, exc_val, exc_tb):
import sys
sys.stdout.write = self.original_write
if exc_type is ZeroDivisionError:
print('Please Don\'t divide by zero!')
return True

contextib工具

  • closing: 创建对象上下文管理器,提供close方法,不需要实现__enter____exit__
  • suppress: 上下文管理器临时忽略指定的异常
  • @contextmanager: 直接生成上下文管理器,而不需要实现__enter____exit__
  • ContextDecorator: 上下文管理器装饰器基类
  • ExitStack: 能够进入几个上下文管理器。当with块结束,其会使用后进先出调用栈中的上下文管理器的__exit__

@contextmanager

@contextmanager减少创建上下文管理器的模板代码。使用@contextmanager时,yield会将函数分成两部分,之前的部分会在__enter__中执行,之后的部分会在__exit__中执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@contextlib.contextmanager
def looking_glass():
import sys
original_write = sys.stdout.write

def reverse_write(text):
original_write(text[::-1])

sys.stdout.write = reverse_write
sys.stdout.write = original_write
msg = ''
try:
yield 'JABBERYWOCKY'
except ZeroDivisionError:
msg = 'Please DO NOT divide by zero'
finally:
sys.stdout.write = original_write
if msg:
print(msg)

概述

Python使用yield关键字来构造生成器,并以迭代器的方式工作。每个生成器都是一个迭代器,生成器实现迭代器接口。

本文主要内容

  • 迭代器的内部实现方式
  • Python中实现传统迭代器模式
  • 生成器的实现方式
  • 传统迭代器如何被生成器替代
  • 利用标准库里的通用生成器
  • 使用yield合并生成器
  • 生成器和协程很像,但实际上差距很大

单词序列

通过实现一个Sentence类来解释迭代

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
RE_WORD = re.compile('\w+')

class Sentence:

def __init__(self, text):
self.text = text
self.words = RE_WORD.findall(text)

def __getitem__(self, index):
return self.words[index]

def __len__(self):
return len(self.words)

def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)

为什么Sentence是可迭代的

当解释器需要迭代一个对象时,其会调用内置方法__iter__,调用顺序

  1. 检查对象是否实现了__iter__,如果实现了,获取迭代器
  2. 如果没有实现__iter__,但是实现了__getitem__,Python会创建一个迭代器,然后会按顺序获取值
  3. 如果都没有,那么会抛出异常

这就是Sentence能迭代的原因,其实现了__getitem__。事实上,标准库序列都实心了__iter__。对于__getietm__的支持有可能会被去掉。Python3.4检查是否可迭代的最准确的方式是iter(x)

可迭代和迭代器

  • 可迭代,对象内置iter能够获取一个迭代器。Python从可迭代对象中获取迭代器。

迭代器的标准接口有两个方法

  1. __next__: 返回下一个值,如果没有更多值了,抛出StopIteration异常
  2. __iter__: 返回迭代器自身
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

class Iterator(Iterable):
__slots__ = ()

@abstractmethod
def __next__(self):
'Return the next item from the iterator. When exhausted, raise StopIteration'
raise StopIteration

def __iter__(self):
return self

@classmethod
def __subclasshook__(cls, C):
if cls is Iterator:
if any("__next__" in B.__dict__ for B in C.__mro__) and any("_iter__" in B.__dict__ for B in C.__mro__):
return True

return NotImplemented

传统迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
RE_WORD = re.compile('\w+')


class Sentence:
def __init__(self, text):
self.text = text
self.words = RE_WORD.findall(text)

def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)

def __iter__(self):
return SentenceIterator(self.words)


class SentenceIterator:
def __init__(self, words):
self.words = words
self.index = 0

def __next__(self):
try:
word = self.words[self.index]
except IndexError:
raise StopIteration()
self.index += 1
return word

def __iter__(self):
return self

生成器方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

RE_WORD = re.compile('\w+')


class Sentence:
def __init__(self, text):
self.text = text
self.words = RE_WORD.findall(text)

def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)

def __iter__(self):
for word in self.words:
yield word
return

生成器如何工作

当调用yield时,返回一个生成器对象,换句话说,生成器方法是个生成器工厂

1
2
3
4
5
6

def gen_123():
yield 1
yield 2
yield 3

更偷懒的实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
RE_WORD = re.compile('\w+')


class Sentence:
def __init__(self, text):
self.text = text

def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)

def __iter__(self):
for match in RE_WORD.finditer(self.text):
yield match.group()

生成器表达式


RE_WORD = re.compile('\w+')


class Sentence:
    def __init__(self, text):
        self.text = text

    def __repr__(self):
        return 'Sentence(%s)' % reprlib.repr(self.text)

    def __iter__(self):
        return (match.group() for match in RE_WORD.finditer(self.text))

迭代器和协程

Python2.5加入了协程

生成器产生数据
协程消耗数据

概述

文本将包括以下内容

  • Python如何支持不同类型的中缀操作符
  • 使用鸭子类型或者显式类型检查来处理各种类型的操作数
  • 中缀操作符方法如果知道不处理操作数
  • 各种比较操作符的特殊行为

操作符101

操作符重载在某些圈子有不好的名声。它是一种容易被滥用的语言特性,导致别人困惑,bugs和意想不到的性能瓶颈。但是如果用好它,能产生令人愉快的APIs和阅读性很好的代码。Python在灵活,可用,安全之间做了平衡。使用以下的限制:

  • 不能重载内置类型操作符
  • 不能创建新的操作符,只能重载原有的
  • 几个操作符不能重载: is, and, or, not

下面开始讨论最简单的一元操作符

一元操作符

  • -(__neg__)
  • +(__pos__)
  • ~(__invert__)
1
2
3
4
5
6
7
8
9
10

def __abs__(self):
return math.sqrt(sum(x * x for x in self))

def __neg__(self):
return Vector(-x for x in self)

def __pos__(self):
return Vector(self)

什么时候 x != +x?

正常情况下x==+x。以下情况两者不相等

第一种情况, decimal.Decimal中,如果x在数学上下文中创建,而+x在不一样的上下文中创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
ctx = decimal.getcontext()
ctx.prec = 40
one_third = decimal.Decimal('1') / decimal.Decimal('3')
print(one_third)
print(one_third == +one_third)
ctx.prec = 28
print(one_third == +one_third)
print(one_third)
print(+one_third)

result:
0.3333333333333333333333333333333333333333
0.3333333333333333333333333333

Decimal的精度被改变。one_third+one_third值不相等。原因是+one_third用新精度会产生一个新的Decimal

第二种情况是collections.Counter,其实现了几个数学操作符。如果使用+操作符,那么会丢弃0及负数

1
2
3
4
5
6
7
8
ct = Counter('abracadabra')
print(ct)

ct['r'] = -3
ct['d'] = 0
print(ct)
print(+ct)

为向量相加重载+

1
2
3
4
5
6
7
8
9
def __add__(self, other):
pairs = itertools.zip_longest(self, other, fillvalue=0.0)
return Vector(a + b for a, b in pairs)

v1 = Vector([3, 4, 5])
v2 = Vector([6, 7, 8])
print(v1 + v2)
print((v1 + v2) == Vector([3 + 6, 4 + 7, 5 + 8]))

pairs 是个生成器,能产生tuple(a, b),aself,bother,如果两者的长度不一致,那么会使用fillvalue填充

1
(10, 20, 30) + v1

然而使用上述表达式会出错。

为了支持不同类型的操作符,Python实现了特殊的下发机制.例如a+b会执行以下步骤

  1. 如果a__add__,调用a.__add__(b),然后返回结果
  2. 如果a没有__add__,或者调用它返回NotImplemented,检查b是否有__radd__,然后调用b.__radd__(a),然后返回结果
  3. 如果b也不含有__radd__,或者调用它返回NotImplemented。抛出TypeError

因此为了使上述的表达式能够正确运行,我们需要实现__radd__方法

1
2
3
def __radd__(self, other):
return self + other

重载*

1
2
3
4
5
6
7
8
9
10
def __mul__(self, scalar):
if isinstance(scalar, numbers.Real):
return Vector(n * scalar for n in self)
return NotImplemented

def __rmul__(self, other):
return self * other

v1 = Vector([1, 2, 3])
print(v1 * 10)

判断被乘数是否是实数

1
2
3
4
5
6
7
8
9
10

def __matmul__(self, other):
try:
return sum(a * b for a, b in zip(self, other))
except TypeError:
return NotImplemented

def __rmatmul__(self, other):
return self @ other

丰富的比较操作符

1
2
3
4
5
def __eq__(self, other):
if isinstance(other, Vector):
return len(self) == len(other) and all(a == b for a, b in zip(self, other))
return NotImplemented

1
2
3
4
5
def __ne__(self, other):
eq_result = self == other
if eq_result is NotImplemented:
return NotImplemented

增量赋值操作符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

class AddableBingoCage(BingoCage):
def __add__(self, other):
if isinstance(other, Tombola):
return AddableBingoCage(self.inspect() + other.inspect())
else:
return NotImplemented

def __iadd__(self, other):
if isinstance(other, Tombola):
other_iterable = other.inspect()
else:
try:
other_iterable = iter(other)
except TypeError:
self_cls = type(self).__name__
msg = "right operand in += must be {!r} or an iterable"
raise TypeError(msg.format(self_cls))

self.load(other_iterable)
return self

概述

Java NIO是从Java1.4后加入的新型IO API。主要包括了以下的核心组件

  • 通道
  • 缓冲
  • 选择器

Java NIO: 通道和缓冲

标准的IO API中,使用字节流和字符流。在NIO中使用通道和缓冲。数据总是从通道读取然后放入缓冲或者从缓冲读取写入通道

channel

有几种通道和缓冲类型,以下是主要通道类型

  • FileChannel(文件通道)
  • DatagramChannel(数据报通道)
  • SocketChannel(套接字通道)
  • ServerSocketChannel(服务器套接字通道)

这些通道覆盖了 UDP+TCP的网络IO以及文件IO

以下是和兴的缓冲实现

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • IntBuffer
  • LongBuffer
  • LongBuffer
  • ShortBuffer

这些缓冲结构覆盖了基本数据类型

Java NIO: 非阻塞IO

Java NIO能够做到非阻塞IO。例如,一个线程能够从通道读取数据到缓冲。当通道读取数据到缓冲时,线程可以做其他事情。一旦数据读取完,线程能够继续执行它

Java NIO: 选择器

Java NIO包含了选择器的概念。选择器是一种对象,其能够监视多个通道的事件,因此单线程能够监控多通道的数据

下图是一个线程使用选择器处理三个通道

selector

为了使用选择器,首先需要将选择器注册到通道,然后调用select()方法。这个方法会阻塞直到这些通道产生了任意一个准备好的事件。一旦方法返回了,线程可以处理这些事件

继承内置类型的技巧

Python2.2之前,不能够继承内置类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

class DoppelDict(dict):
def __setitem__(self, key, value):
super().__setitem__(key, [value] * 2)


dd = DoppelDict(one=1)
print(dd)

dd['two'] = 2
print(dd)
dd.update(three=3)
print(dd)

result:
{'one': 1}
{'one': 1, 'two': [2, 2]}
{'one': 1, 'two': [2, 2], 'three': 3}

DoppelDict当排序时,复制值.__init__忽略__setitem__被重写,因此one没有被复制。[]操作符调用__setitem__,因此two被复制。update方法没有使用重写的__setitem__方法。因此three没有复制

方法的寻找总是从目标对象开始,然后向上寻找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class AnswerDict(dict):
def __getitem__(self, key):
return 42

ad = AnswerDict(a='foo')
print(ad['a'])

d = {}
d.update(ad)
print(d['a'])
print(d)

result:
42
foo
{'a': 'foo'}

AnswerDict.__getitem__()总是返回42.d是个dict,使用ad更新它,忽略了AnswerDict.__getitem__()

直接继承内置类型如dict或者list容易出错,因为内置的方法大多数会忽略用户自定的方法。因此从collections模块中使用UserDict,UserLisat,UserString来派生类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class DoppelDict2(collections.UserDict):
def __setitem__(self, key, value):
super().__setitem__(key, [value] * 2)


dd = DoppelDict2(one=1)
print(dd)
dd['two'] = 2
print(dd)
dd.update(three=3)
print(dd)

result:
{'one': [1, 1]}
{'one': [1, 1], 'two': [2, 2]}
{'one': [1, 1], 'two': [2, 2], 'three': [3, 3]}

多继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

class A:
def ping(self):
print('ping:', self)


class B(A):
def pong(self):
print('pong:', self)


class C(A):
def pong(self):
print('PONG:', self)


class D(B, C):
def ping(self):
super().ping()
print('post-ping:', self)

def pingpong(self):
self.ping()
super().ping()
self.pong()
super().pong()
C.pong()

d = D()
print(d.pong())
print(C.pong(d))

print(D.__mro__)

result:
pong: <__main__.D object at 0x1024a66d8>
PONG: <__main__.D object at 0x1024a66d8>
(<class '__main__.D'>, <class '__main__.B'>, <class '__main__.C'>, <class '__main__.A'>, <class 'object'>)

当在一个类上直接调用方法时,需要显式传入self。调用方法的顺序依据MRO(__mro__)采用C3算法

1
2
3
4
5
6
7
8

def print_mro(cls):
print(', '.join(c.__name__ for c in cls.__mro__))

print_mro(tkinter.Text)

result:
Text, Widget, BaseWidget, Misc, Pack, Place, Grid, XView, YView, object

其搜索路径就是按着Text, Widget, BaseWidget, Misc, Pack, Place, Grid, XView, YView, object从左往右搜索

对于多重继承来说,继承顺序决定了搜索顺序

真实的多重继承案例

适配器模式就使用了多重继承.Python中最常见的多重继承是collection.abc包。标准库Tkinter GUI使用了大量的多重继承

1
2
3
4
5
6
7
8
9
10
11
12
13
print_mro(tkinter.Toplevel)
print_mro(tkinter.Widget)
print_mro(tkinter.Button)
print_mro(tkinter.Entry)
print_mro(tkinter.Text)

result:
Toplevel, BaseWidget, Misc, Wm, object
Widget, BaseWidget, Misc, Pack, Place, Grid, object
Button, Widget, BaseWidget, Misc, Pack, Place, Grid, object
Entry, Widget, BaseWidget, Misc, Pack, Place, Grid, XView, object
Text, Widget, BaseWidget, Misc, Pack, Place, Grid, XView, YView, object

应对多重继承

  1. 区分接口继承和实现继承

    • 接口继承创建子类型,实现is-a关系
    • 实现继承利于代码复用
  2. 使用ABCs来创造接口
    

  3. 使用Mixin来使代码复用

    如果一个类被设计成为多个不相关的类提供复用,那么请继承Mixin, Mixin不会定义新类型,只是封装方法以便复用.Mixin不该被实例化,任何实体类都不该仅仅只继承Mixin. 每个Mixin应该只提供一种单一特定的行为,实现几个紧密相关的方法

  4. 显式命名Mixin

    没有标准的方式来说明一个类是Mixin,因此建议使用Mixin作为后缀

  5. ABC可能是Mixin,反过来不一定成立

  6. 不要继承多个实体类

  7. 提供集合类给用户

  8. 组合胜过继承

Python文化中的接口和协议

PythonABCs引入之前已经很成功了.接口在动态语言中是如何工作的。其没有interface关键字。对于ABCs,每个类有个接口。协议是接口。接口的定义是对象公共方法的子集,能够实现特定的功能。Python中最基础的接口之一是序列协议

运行中实现协议

1
2
3
l = list(range(10))
shuffle(l)
print(l)

对于普通自定义对象,如果想使用shuffle那么需要实现__setitem__,因此可以动态设置xxx.__setitem__ = set_xxx。但这其中暴露了__setitem__给外界,破坏了封装性

ABC子类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

class FrenchDeck2(collections.MutableSequence):
ranks = [str(n) for n in range(2, 11)] + list('JQKA')
suits = 'spades diamonds clubs hearts'.split()

def __init__(self):
self._cards = [Card(rank, suit) for suit in self.suits for rank in self.ranks]

def __len__(self):
return len(self._cards)

def __getitem__(self, position):
return self._cards[position]

def __setitem__(self, position, value):
self._cards[position] = value

def __delitem__(self, position):
del self._cards[position]

def insert(self, position, value):
self._cards.insert(position, value)

继承链是MutableSequence->Sequence

标准库中的ABCs

Python2.6之后引入了ABCs

collections.abc

  • Iterable, Container, Sized
  • Sequence, Mapping, Set
  • MappingView
  • Callable, Hashable
  • Iterator

除了collections.abc之外,标准库中最有用的ABCs就是numbers

numbers

  • Number
  • Complex
  • Real
  • Rational
  • Integral

因此我们需要使用isinstance(x, numbers.Integral)来检查整形。需要注意的是decimal.Decimal没有成为numbers.Real的子类

定义和使用ABC

假设我们需要在网页或者APP上随机展示广告。我们将定义名为Tombola的抽象类。

Tombola抽象类有四个方法。两个抽象方法是

  • load(): 放条目到容器中
  • pick(): 随机移除并返回条目

实体方法

  • loaded(): 如果容器至少有一个条目,那么返回True
  • inspect(): 返回排过序的的元组

tombola

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

class Tombola(abc.ABC):
@abc.abstractmethod
def load(self, iterable):
"""Add items from an iterable."""

@abc.abstractmethod
def pick(self):
"""Remove item at random, returning it.

This method should raise 'LookupError' when the instance is empty
"""

def loaded(self):
"""Return 'True' if there's at least 1 item, 'Falsle` otherwise."""
return bool(self.inspect())

def inspect(self):
"""Return a sorted tuple with the items currently inside."""
items = []
while True:
try:
items.append(self.pick())
except LookupError:
break
self.load(items)
return tuple(sorted(items))

使用@abc.abstractmethod标识抽象方法

1
2
3
4
class Fake(Tombola):
def pick(self):
return 13

ABC详细语法

声明抽象类的最好方式是继承abc.ABC或者其他ABC。然而abc.ABCPython3.4才引入的。在这此前必须使用metaclass=keyword

1
2
class Tombola(metaclass=abc.ABCMeta):
# ...

metaclass=keywordPython3才引入的,Python2中,必须使用__metaclass__

1
2
3
class Tombola(object):
__metaclass__ = abc._ABCMeta

Tombola的子类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

class BingoCage(Tombola):
def __init__(self, items):
self._randomizer = random.SystemRandom()
self._items = []
self.load(items)

def load(self, items):
self._items.extend(items)
self._randomizer.shuffle(self._items)

def pick(self):
try:
return self._items.pop()
except IndexError:
raise LookupError('pick from empty BingoCage')

def __call__(self):
self.pick()

class LotteryBlower(Tombola):
def __init__(self, iterable):
self._balls = list(iterable)

def load(self, iterable):
self._balls.extend(iterable)

def pick(self):
try:
position = random.randrange(len(self._balls))
except ValueError:
raise LookupError('pick from empty BingoCage')
return self._balls.pop(position)

def loaded(self):
return bool(self._balls)

def inspect(self):
return tuple(sorted(self._balls))

Tombola的虚子类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

@Tombola.register
class TomboList(list):
def pick(self):
if self:
position = randrange(len(self))
return self.pop(position)
else:
raise LookupError('pop from empty TomboList')

load = list.extend

def loaded(self):
return bool(self)

def inspect(self):
return tuple(sorted(self))

使用@Tombola.register注册作为Tombola的虚子类

注意因为注册了,所以issubclassisinstance都能表现为TomboListTombola的子类。

但是打印继承关系

1
2
3
4
print(TomboList.__mro__)

result:
(<class '__main__.TomboList'>, <class 'list'>, <class 'object'>)

可以看出TomboList并没有集成自Tombola

Tombola子类测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
TEST_FILE = 'tombola_tests.rst'
TEST_MSG = '{0:16} {1.attempted:2} tests, {1.failed:2} failed - {2}'


def main(argv):
verbose = '-v' in argv
real_subclasses = Tombola.__subclasses__()
virtual_subclasses = list(Tombola._abc_registry)

for cls in real_subclasses + virtual_subclasses:
test(cls, verbose)


def test(cls, verbose=False):
res = doctest.testfile(
TEST_FILE,
globs={'ConcreteTombola': cls},
verbose=verbose,
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)

tag = 'FAIL' if res.failed else 'OK'
print(TEST_MSG.format(cls.__name__, res, tag))


if __name__ == '__main__':
import sys

main(sys.argv)

前言

本文着重介绍

  • 基本序列协议: __len____getitem__
  • 带有多个条目对象的安全展示
  • 合理的切片支持
  • 哈希处理
  • 自定义格式化语言拓展

Vector: 自定义序列类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Vector:
typecode = 'd'

def __init__(self, components):
self._components = array(self.typecode, components)

def __iter__(self):
return iter(self._components)

def __repr__(self):
components = reprlib.repr(self._components)
components = components[components.find('['):-1]
return 'Vector({})'.format(components)

def __str__(self):
return str(tuple(self))

def __bytes__(self):
return bytes([ord(self.typecode)]) + bytes(self._components)

def __eq__(self, other):
return tuple(self) == tuple(other)

def __abs__(self):
return math.sqrt(sum(x * x for x in self))

def __bool__(self):
return bool(abs(self))

@classmethod
def frombytes(cls, octets):
typecode = chr(octets[0])
memv = memoryview(octets[1:]).cast(typecode)
return cls(memv)

  • 使用reprlib来进行安全展示

协议和鸭子类型

Vector实现__len____getitem__协议

1
2
3
4
5
6
def __len__(self):
return len(self._components)

def __getitem__(self, item):
return self._components[item]

增强切片功能

1
2
3
4
5
6
7
8
9
10
def __getitem__(self, item):
cls = type(self)
if isinstance(item, slice):
return cls(self._components[item])
elif isinstance(item, numbers.Integral):
return self._components[item]
else:
msg = '{cls.__name__} indices must be integers'
raise TypeError(msg.format(cls=cls))

动态属性访问

__getattr__当属性查找失败被调用。简单来说就是,给定表达式my_obj.x, Python检查对象是否含有x属性。如果没有,继续寻找类(my_obj.__class__),然后继承链向上查找。如果依然没有找到x,那么__getattr__就会被调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def __getattr__(self, name):
cls = type(self)
if len(name) == 1:
pos = cls.shortcut_names.find(name)
if 0 <= pos < len(self._components):
return self._components[pos]
msg = '{.__name__!r} object has no attribute {!r}'
raise AttributeError(msg.format(cls, name))

def __setattr__(self, name, value):
cls = type(self)
if len(name) == 1:
if name in cls.shortcut_names:
error = 'readonly attribute {attr_name!r}'
elif name.islower():
error = "can't set attributes 'a' to 'z' in {cls_name!r}"

else:
error = ''

if error:
msg = error.format(cls_name=cls.__name__, attr_name=name)
raise AttributeError(msg)

super().__setattr__(name, value)

哈希和更快的==

1
2
3
4
5
6
7
def __hash__(self):
hashes = map(hash, self._components)
return functools.reduce(operator.xor, hashes, 0)

def __eq__(self, other):
return len(self) == len(other) and all(a == b for a, b in zip(self, other))

格式化

1
2
3
4
5
6
7
8
9
10
11
12
13

def __format__(self, fmt_spec):
if fmt_spec.endswith('h'):
fmt_spec = fmt_spec[:-1]
coords = itertools.chain([abs(self), self.angles()])
outer_fmt = '<{}>'

else:
coords = self
outer_fmt = '({})'
components = (format(c, fmt_spec) for c in coords)
return outer_fmt.format(', '.join(components))