Zookeeper源码分析之选举机制

概述

Zookeeper提供了三种选举方式

  • AuthFastLeaderElection
  • 带认证的AuthFastLeaderElection
  • FastLeaderElection

选择机制

选举状态

1
LOOKING, FOLLOWING, LEADING, OBSERVING;
1
2
3
4
5
6
7
8
9
10
11
12
13
synchronized public void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}

this.electionAlg = createElectionAlgorithm(electionType);
}

如果当前节点的状态是正在寻找leader节点,那么创建当前投票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;

//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

创建选举算法,默认是采用FastLeaderElection

1
2
QuorumPeerConfig.java
protected int electionAlg = 3;

创建QuorumCnxManager管理器,创建FastLeaderElection并开始选举

1
2
3
4
5
6
7
8
public void start() {
this.messenger.start();
}

void start(){
this.wsThread.start();
this.wrThread.start();
}

启动wswr线程

WorkerSender发送队列

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;

process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}

从队列中读取数据,然后调用process处理。使用ByteBuffer封装要发送的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
connectOne(sid);

}
}

如果发送节点和接收节点相同,那么直接进入接受节点.否则放入发送队列。然后连接目标节点

1
2
3
4
case LOOKING:
...
setCurrentVote(makeLEStrategy().lookForLeader());
...

当前节点状态是LOOKING时,寻找leader节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
public Vote lookForLeader() throws InterruptedException {
...
try {
Map<Long, Vote> recvset = new HashMap<Long, Vote>();

Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

int notTimeout = finalizeWait;

synchronized(this){
//给自己投票
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();

/*
* Loop in which we exchange notifications until we find a leader
*/

while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);

/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}

/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view.
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid" + n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}

if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}

recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {

// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());

Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
}
}
...
}

给自己投票,将投票信息发送给其他节点。循环上述操作,直到选出结果。

  • 如果没有收到消息
    • 确认所有的消息都已经接收完,则对外发送消息
    • 尝试连接其他服务器
  • 接收的消息来自集群服务器
    • 如果选举轮次大于当前轮次,则更新当前选票信息并通知其他服务器
    • 检查是否已经有选举结果
  • 同一轮选举
    • 判断是否已经有选举结果

选举流程简述:

假设有5台服务器,编号分别为0,1,2,3,4。选举过程如下

  • 服务器0给自己投票,然后发送投票信息,进入LOADING
  • 服务器1给自己投票,然后与0交换投票结果。1的编号大于0,1胜出,但参与投票数没有大于半数。继续等待
  • 服务器2给自己投票,然后与0,1交换投票结果,2胜出,成为leader
  • 服务器3给自己投票,与0,1,2交换结果,3胜出,但参与投票数没有超过半数,等待
  • 服务器4给自己投票,与0,1,2,3,4交换结果,4胜出,成为leader