Zookeeper 选举流程 - litter-fish/ReadSource GitHub Wiki
每台服务器启动的时候都会启动一个QuorumCnxManager,负责各台服务器之间的底层Leader选举过程的通信。
// 发送器集合,按照SID分组,每个 SendWorker 消息发送器,都对应一台远程ZK服务器,负责消息发送。
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
// 消息发送队列,用于保存待发送的消息,按照SID进行分组,分别为集群中的每台机器分配一个单独的队列
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
// 为每个SID保存最近发送的消息
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
// 消息接收队列,用于存放其他服务器接收到的信息。
public final ArrayBlockingQueue<Message> recvQueue;
QuorumCnxManager在启动的时候,会创建一个ServerSocker来监听Leader通信端口,开启端口监听后,ZK能够不断的接受来自其他服务器的“创建连接”请求
public QuorumCnxManager(final long mySid,
Map<Long,QuorumPeer.QuorumServer> view,
QuorumAuthServer authServer,
QuorumAuthLearner authLearner,
int socketTimeout,
boolean listenOnAllIPs,
int quorumCnxnThreadsSize,
boolean quorumSaslAuthEnabled,
ConcurrentHashMap<Long, SendWorker> senderWorkerMap) {
this.senderWorkerMap = senderWorkerMap;
// 创建一个ServerSocker来监听Leader通信端口
listener = new Listener();
}
受来自其他服务器的“创建连接”请求
public class Listener extends ZooKeeperThread {
public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
if (listenOnAllIPs) {
int port = view.get(QuorumCnxManager.this.mySid)
.electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = view.get(QuorumCnxManager.this.mySid)
.electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(view.get(QuorumCnxManager.this.mySid)
.electionAddr.toString());
// 绑定端口
ss.bind(addr);
while (!shutdown) {
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
// 处理其他服务器的连接请求
receiveConnection(client);
}
numRetries = 0;
}
} catch (IOException e) {
// 省略其他部分
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
+ view.get(QuorumCnxManager.this.mySid).electionAddr);
}
}
// 省略其他部分
}
处理其他服务器的连接请求: 根据接收到的请求,提取SID,通过和自己的SID进行比较,如果更大,则建立连接,创建消息发送、接受器线程等,否则销毁连接,保证不会存在重复连接问题
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
// 获取输入数据
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connection",
sock.getRemoteSocketAddress());
closeSocket(sock);
}
}
private void handleConnection(Socket sock, DataInputStream din)
throws IOException {
Long sid = null;
// 获取sid
try {
// Read server id
sid = din.readLong();
if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
sid = din.readLong();
// next comes the #bytes in the remainder of the message
// note that 0 bytes is fine (old servers)
int num_remaining_bytes = din.readInt();
if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
closeSocket(sock);
return;
}
byte[] b = new byte[num_remaining_bytes];
// remove the remainder of the message from din
int num_read = din.read(b);
if (num_read != num_remaining_bytes) {
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
* the connection.
*/
sid = observerCounter.getAndDecrement();
LOG.info("Setting arbitrary identifier to observer: " + sid);
}
} catch (IOException e) {
closeSocket(sock);
LOG.warn("Exception reading or writing challenge: " + e.toString());
return;
}
// do authenticating learner
LOG.debug("Authenticating learner server.id: {}", sid);
authServer.authenticate(sock, din);
// 如果接受到的SID比自己的sid小,则获取消息发送器、断开连接.
if (sid < this.mySid) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
connectOne(sid);
// Otherwise start worker threads to receive data.
} else {
// 创建消息发送器和消息接收器
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
// 将SID加入发送器集合
senderWorkerMap.put(sid, sw);
// 将SID加入消息发送队列集合中
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
// 启动消息发送和接收线程
sw.start();
rw.start();
return;
}
}
// 选票发送队列,保存待发送的选票
LinkedBlockingQueue<ToSend> sendqueue;
// 选票接受队列,保存接受到的外部投票
LinkedBlockingQueue<Notification> recvqueue;
FastLeaderElection与QuorumCnxManager关系图
- 每个服务器发出一个投票,初始情况下都会以自己为leader发送投票,投票形式(myid, zxid)
- 接收其他服务器的投票,并判断其有效性,是否是同一轮投票,是否是looking状态下的投票
- 处理投票,根据规则进行投票PK,对于PK失败的选票需要变更选票,再重新发送选票,而成功的选票直接发送原来选票即可 3.1 先比较zxid,zxid比较大的优先成为leader 3.2 如果zxid一样,再比较myid
- 统计投票,每次投票后,服务器都会统计本轮次接收的票数信息,如果有过半的机器投了相同的票,则过半机器为leader,停止投票
- 改变服务器状态,服务器自己根据角色进行变更,follower变成following,leader变成leading
- 变更服务器状态,所有非observe的服务器变更为looking状态
- 每个服务器根据当前zxid构造投票,第一次是myid都是自己,并将投票发送给所有服务器
- 接收其他服务器投票
- 投票PK
- 统计投票,判断过半
- 变更服务器状态
选票结构
public class Vote {
// 当前服务器自身的id
final private long id;
// 当前服务器最新的zxid值
final private long zxid;
// 当前服务器的选举轮次
final private long electionEpoch;
// 被推举服务器的轮次
final private long peerEpoch;
// 当前服务器状态
final private ServerState state;
}
轮询判断机器状态
// --QuorumPeer
public void run() {
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
// 省略其他代码
} else {
try {
setBCVote(null);
// 进行投票选举
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
// 省略其他代码
}
}
} finally {
// 省略其他代码
}
}
选举主流程
public Vote lookForLeader() throws InterruptedException {
// 省略其他代码
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this) {
// 1. 自增选举轮次,所有有效投票必须在同一个轮次中进行
logicalclock++;
// 2. 初始化投票
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 3. 发送初始化选票
sendNotifications();
// 循环,直到选出leader
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
// 4. 从队列 recvqueue 中接收外部选票信息
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){ // 判断是否已经连接
sendNotifications(); // 发送自己选票
} else {
// 4.2 没有建立连接需要执行重新连接
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(self.getVotingView().containsKey(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
switch (n.state) {
case LOOKING:
// 如果外部的选举轮次大于内部轮次
if (n.electionEpoch > logicalclock) {
// 将外部选举轮次赋值给内部轮次
logicalclock = n.electionEpoch;
// 清空自己收到的投票
recvset.clear();
// 使用初始化的投票与外部投票进行PK
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
// PK失败使用外部投票更新自己的投票信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
// PK成功使用初始化的自己的投票更新投票信息
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
// 再次发送更新后的投票信息
sendNotifications();
// 如果外部投票的批次比内部批次小直接忽略
} else if (n.electionEpoch < logicalclock) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
}
break;
// 轮次相同,进行PK,PK失败进行投票更新及发送投票, PK成功不需要发送投票????
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// 投票归档,记录当前服务器本轮次收到所有外部服务器的投票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 投票统计,判断是否有半数投给了内部选票
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
// 等待200毫秒确定是否还有更优投票
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
// 200毫秒后如果没有更优投票则进行服务器状态更新
if (n == null) {
// 变更服务器状态
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
// 使用leader投票构造对象
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock,
proposedEpoch);
// 清空投票接收队列
leaveInstance(endVote);
return endVote;
}
}
break;
// 省略其他代码
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
// 省略其他代码
}
}
发送选票
private void sendNotifications() {
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
// 构造选票信息
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock,
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
// 将待发送投票放到 sendqueue(LinkedBlockingQueue<ToSend>) 队列中,待 WorkerSender 线程发送
sendqueue.offer(notmsg);
}
}
投票PK
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
投票统计,判断是否已经超过半数投票认可当前内部投票
protected boolean termPredicate(
HashMap<Long, Vote> votes, // 收到的外部投票集合
Vote vote) { // 当前认可的内部投票
HashSet<Long> set = new HashSet<Long>();
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())){
set.add(entry.getKey());
}
}
// 判断是否已经超过半数投票认可当前内部投票
return self.getQuorumVerifier().containsQuorum(set);
}