Zookeeper 数据同步 - litter-fish/ReadSource GitHub Wiki
从图中可以看出,数据同步包括:直接差异同步(DIFF)、先回滚在差异同步(TRUNC + DIFF)、仅回滚(TRUNC)、全量同步(SNAP)
// 该Learner服务器最后处理的ZXID
peerLastZxid = ss.getLastZxid();
// Leader服务器缓存队列 CommittedLog 中最大ZXID
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
// Leader服务器缓存队列 CommittedLog 中最小ZXID
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
// 提议缓存队列
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
场景:peerLastZxid 介于 minCommittedLog 和 maxCommittedLog 之间
- 
Leader对于同步的处理: Leader首先会向Learner服务器发送一个DIFF指令,通知Learner服务器,进行差异化数据同步阶段, 同步过程中,对于每个Proposal都会发送一个Proposal内容数据包和Commit指令数据包, 接着会发送一个 NEWLEADER指令,用于通知learner,已经将提案缓存队列中的Proposal都已经同步了,并等待Learner服务器反馈数据同步完成的ACK指令 
- 
Learner服务器的处理: Learner服务器接收到一个数据同步指令(DIFF、TRUNC、SNAP),告诉自己准备进入数据同步过程,然后依次处理接收到的同步数据包, 对于最后的数据包NEWLEADER指令,需要向Leader反馈数据同步完成指令。 
- 
Leader接收到Learner反馈的ACK指令,判断是否过半反馈,然后Leader服务器会向完成数据同步的Learner服务器发送一个UPTODATE指令,通知Learner服务器完成数据同步 
- 
Learner服务器收到UPTODATE指令后,会终止数据同步流程,然后向Leader反馈一个ACK指令 
举例说明: Leader服务器的提议缓存队列 = [0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005] 而Learner服务器最后处理的ZXID = 0x500000003,进行数据同步时,Leader会依次将0x500000004, 0x500000005两个提议发送给Learner
发送数据包顺序:
| 发送顺序 | 数据包类型 | 对应ZXID | 
|---|---|---|
| 1 | DIFF | 0x500000003 | 
| 2 | PROPOSAL | 0x500000004 | 
| 3 | COMMIT | 0x500000004 | 
| 4 | PROPOSAL | 0x500000005 | 
| 5 | COMMIT | 0x500000005 | 
假设A、B、C三台服务器,开始Leader为B,Epoch为5,处理了0x500000001、0x500000002提案,并且已经提交给了Learner服务器
一个新的提案0x500000003,Leader已经写入到Leader本地事务日志中,此时服务器B崩溃了,提案没被发送出去。 接着A、C服务器进行选举,产生新的Leader A,Epoch为6,这时处理了0x600000001、0x600000002提案,这时服务器B启动了,需要与新的 Leader进行数据同步。
这时 peerLastZxid = 0x500000003,maxCommittedLog = 0x600000002,minCommittedLog = 0x500000001 proposals = [0x500000001, 0x500000002, 0x600000001, 0x600000002]
进行赋值: zxidToSend = 0x500000002 updates = 0x500000002
首先发送一个TRUNC指令,且zxid等于服务器B崩溃前新Leader处理的最后提议, 接着发送提议缓存队列中的每个提议,每次发送一个时都会发送一个COMMIT指令。
发送数据包顺序:
| 发送顺序 | 数据包类型 | 对应ZXID | 
|---|---|---|
| 1 | TRUNC | 0x500000002 | 
| 2 | PROPOSAL | 0x600000001 | 
| 3 | COMMIT | 0x600000001 | 
| 4 | PROPOSAL | 0x600000002 | 
| 5 | COMMIT | 0x600000002 | 
场景:peerLastZxid 大于 maxCommittedLog
场景1:peerLastZxid 小于 minCommittedLog 场景2:Leader服务器上没有提议缓存队列,peerLastZxid 不等于 Leader服务器数据恢复后得到最大的ZXID
Leader 先发送一个 SNAP 指令,通知Learner将进行全量数据同步, 随后,Leader会从内存数据库中获取全量数据节点和会话超时时间记录器,然后将其序列化后传输给Learner Learner接收到后会进行反序列化,用于与内存数据库中。
源码分析: Leader 数据同步主流程
// org/apache/zookeeper/server/quorum/LearnerHandler.java
public void run() {
    try {
        // 省略Leader与Learner状态的交互
        // 该Learner服务器最后处理的ZXID
        peerLastZxid = ss.getLastZxid();
        /* the default to send to the follower */
        int packetToSend = Leader.SNAP;
        long zxidToSend = 0;
        long leaderLastZxid = 0;
        /** the packets that the follower needs to get updates from **/
        long updates = peerLastZxid;
        ////////////// Leader服务器与Learner服务器进行数据同步
        /* we are sending the diff check if we have proposals in memory to be able to
         * send a diff to the
         */
        ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
        ReadLock rl = lock.readLock();
        try {
            rl.lock();
            // Leader服务器缓存队列 CommittedLog 中最大ZXID
            final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
            // Leader服务器缓存队列 CommittedLog 中最小ZXID
            final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
            LOG.info("Synchronizing with Follower sid: " + sid
                    +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
                    +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
                    +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
            // 提议缓存队列
            LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
            // Leader的ZXID与Learner服务器的ZXID相同,则不需要进行同步
            if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                // Follower is already sync with us, send empty diff
                LOG.info("leader and follower are in sync, zxid=0x{}",
                        Long.toHexString(peerLastZxid));
                packetToSend = Leader.DIFF;
                zxidToSend = peerLastZxid;
            } else if (proposals.size() != 0) {
                LOG.debug("proposal size is {}", proposals.size());
                // peerLastZxid 介于 minCommittedLog 和 maxCommittedLog 之间
                if ((maxCommittedLog >= peerLastZxid)
                        && (minCommittedLog <= peerLastZxid)) {
                    LOG.debug("Sending proposals to follower");
                    // as we look through proposals, this variable keeps track of previous
                    // proposal Id.
                    long prevProposalZxid = minCommittedLog;
                    // Keep track of whether we are about to send the first packet.
                    // Before sending the first packet, we have to tell the learner
                    // whether to expect a trunc or a diff
                    boolean firstPacket=true;
                    packetToSend = Leader.DIFF;
                    zxidToSend = maxCommittedLog;
                    for (Proposal propose: proposals) {
                        // 跳过已经处理的提案
                        if (propose.packet.getZxid() <= peerLastZxid) {
                            prevProposalZxid = propose.packet.getZxid();
                            continue;
                        } else {
                            if (firstPacket) {
                                firstPacket = false;
                                // 先回滚处理后差异同步,确认回滚点
                                if (prevProposalZxid < peerLastZxid) {
                                    packetToSend = Leader.TRUNC;
                                    zxidToSend = prevProposalZxid;
                                    updates = zxidToSend;
                                }
                            }
                            // 提案 Proposal 数据包 放入队列中等待发送
                            queuePacket(propose.packet);
                            // 提案 Proposal 的 COMMIT 指令包加入队列中等待发送
                            QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                    null, null);
                            queuePacket(qcommit);
                        }
                    }
                // 如果最后的ZXID比新Leader的ZXID大,此时直接进行回滚操作
                } else if (peerLastZxid > maxCommittedLog) {
                    LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
                            Long.toHexString(maxCommittedLog),
                            Long.toHexString(updates));
                    packetToSend = Leader.TRUNC;
                    zxidToSend = maxCommittedLog;
                    updates = zxidToSend;
                } else {
                    LOG.warn("Unhandled proposal scenario");
                }
            } else {
                // just let the state transfer happen
                LOG.debug("proposals is empty");
            }
            LOG.info("Sending " + Leader.getPacketType(packetToSend));
            leaderLastZxid = leader.startForwarding(this, updates);
        } finally {
            rl.unlock();
        }
         QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                ZxidUtils.makeZxid(newEpoch, 0), null, null);
         if (getVersion() < 0x10000) {
            oa.writeRecord(newLeaderQP, "packet");
        } else {
            // 加入 NEWLEADER 指令,用于通知 Learner,已经将缓存队列里面的提案都同步给自己了
            queuedPackets.add(newLeaderQP);
        }
        bufferedOutput.flush();
        //Need to set the zxidToSend to the latest zxid
        if (packetToSend == Leader.SNAP) {
            zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
        }
        // 首先会发送一个数据同步指令(DIFF、TUNC、SNAP)给learner服务器,报文中包含最后
        oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
        bufferedOutput.flush();
        /* if we are not truncating or sending a diff just send a snapshot */
        // 进行全量数据同步
        if (packetToSend == Leader.SNAP) {
            // 从内存数据库中获取全量数据节点和会话超时时间记录器,然后将其序列化
            leader.zk.getZKDatabase().serializeSnapshot(oa);
            oa.writeString("BenWasHere", "signature");
        }
        bufferedOutput.flush();
        // Start sending packets
        // 创建并启动一个新的线程用于发送 Proposal 数据
        new Thread() {
            public void run() {
                Thread.currentThread().setName(
                        "Sender-" + sock.getRemoteSocketAddress());
                try {
                    sendPackets();
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption",e);
                }
            }
        }.start();
        qp = new QuorumPacket();
        // 等待learner反馈ACK指令
        ia.readRecord(qp, "packet");
        if(qp.getType() != Leader.ACK){
            LOG.error("Next packet was supposed to be an ACK");
            return;
        }
        LOG.info("Received NEWLEADER-ACK message from " + getSid());
        // 阻塞,等待 NEWLEADER 指令的过半反馈ACK
        leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
        syncLimitCheck.start();
        sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
        synchronized(leader.zk){
            while(!leader.zk.isRunning() && !this.isInterrupted()){
                leader.zk.wait(20);
            }
        }
        // 发送 UPTODATE 指令,告诉Learner终止数据同步操作
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
        // 
    } // catch
}
Learner 服务器接收到数据同步的处理主流程
// org/apache/zookeeper/server/quorum/Learner.java
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
    QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
    QuorumPacket qp = new QuorumPacket();
    long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
    boolean snapshotNeeded = true;
    // 阻塞等待leader进行数据同步的第一个数据包
    readPacket(qp);
    LinkedList<Long> packetsCommitted = new LinkedList<Long>();
    LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
    synchronized (zk) {
        // 差异同步
        if (qp.getType() == Leader.DIFF) {
            LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
            snapshotNeeded = false;
        }
        // 全量同步
        else if (qp.getType() == Leader.SNAP) {
            LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
            // 清除内存数据
            zk.getZKDatabase().clear();
            // 反序列化快照文件
            zk.getZKDatabase().deserializeSnapshot(leaderIs);
            String signature = leaderIs.readString("signature");
            if (!signature.equals("BenWasHere")) {
                LOG.error("Missing signature. Got " + signature);
                throw new IOException("Missing signature");                   
            }
            zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
        // 回滚同步
        } else if (qp.getType() == Leader.TRUNC) {
            //we need to truncate the log to the lastzxid of the leader
            LOG.warn("Truncating log to get in sync with the leader 0x"
                    + Long.toHexString(qp.getZxid()));
            // 回滚日志操作
            boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
            if (!truncated) {
                // not able to truncate the log
                LOG.error("Not able to truncate the log "
                        + Long.toHexString(qp.getZxid()));
                System.exit(13);
            }
            zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
        }
        else {
            LOG.error("Got unexpected packet from leader "
                    + qp.getType() + " exiting ... " );
            System.exit(13);
        }
        zk.createSessionTracker();
        long lastQueued = 0;
        boolean isPreZAB1_0 = true;
        boolean writeToTxnLog = !snapshotNeeded;
        // we are now going to start getting transactions to apply followed by an UPTODATE
        outerLoop:
        while (self.isRunning()) {
            // 阻塞,读取提议数据包
            readPacket(qp);
            switch(qp.getType()) {
            case Leader.PROPOSAL: // 提案处理
                PacketInFlight pif = new PacketInFlight();
                pif.hdr = new TxnHeader();
                pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                if (pif.hdr.getZxid() != lastQueued + 1) {
                    LOG.warn("Got zxid 0x"
                        + Long.toHexString(pif.hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
                }
                lastQueued = pif.hdr.getZxid();
                packetsNotCommitted.add(pif);
                break;
            case Leader.COMMIT:
                if (!writeToTxnLog) {
                    pif = packetsNotCommitted.peekFirst();
                    if (pif.hdr.getZxid() != qp.getZxid()) {
                        LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                    } else {
                        zk.processTxn(pif.hdr, pif.rec);
                        packetsNotCommitted.remove();
                    }
                } else {
                    packetsCommitted.add(qp.getZxid());
                }
                break;
            case Leader.INFORM:
                PacketInFlight packet = new PacketInFlight();
                packet.hdr = new TxnHeader();
                packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
                // Log warning message if txn comes out-of-order
                if (packet.hdr.getZxid() != lastQueued + 1) {
                    LOG.warn("Got zxid 0x"
                            + Long.toHexString(packet.hdr.getZxid())
                            + " expected 0x"
                            + Long.toHexString(lastQueued + 1));
                }
                lastQueued = packet.hdr.getZxid();
                if (!writeToTxnLog) {
                    // Apply to db directly if we haven't taken the snapshot
                    zk.processTxn(packet.hdr, packet.rec);
                } else {
                    packetsNotCommitted.add(packet);
                    packetsCommitted.add(qp.getZxid());
                }
                break;
            case Leader.UPTODATE: // UPTODATE 指令,告诉Learner终止数据同步操作
                if (isPreZAB1_0) {
                    zk.takeSnapshot();
                    self.setCurrentEpoch(newEpoch);
                }
                self.cnxnFactory.setZooKeeperServer(zk);                
                break outerLoop;
            // 通知learner,已经将提案缓存队列中的Proposal都已经同步了。
            case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                File updating = new File(self.getTxnFactory().getSnapDir(),
                                    QuorumPeer.UPDATING_EPOCH_FILENAME);
                if (!updating.exists() && !updating.createNewFile()) {
                    throw new IOException("Failed to create " +
                                          updating.toString());
                }
                if (snapshotNeeded) {
                    zk.takeSnapshot();
                }
                self.setCurrentEpoch(newEpoch);
                if (!updating.delete()) {
                    throw new IOException("Failed to delete " +
                                          updating.toString());
                }
                writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
                isPreZAB1_0 = false;
                // 反馈Leader 一个NEWLEADER 的ACK指令
                writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                break;
            }
        }
    }
    ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
    // 反馈Leader 一个UPTODATE 的ACK指令
    writePacket(ack, true);
    sock.setSoTimeout(self.tickTime * self.syncLimit);
    // 启动Learner服务器
    zk.startup();
    // 省略其他代码
}
回滚日志操作
// org/apache/zookeeper/server/ZKDatabase.java
public boolean truncateLog(long zxid) throws IOException {
    clear();
    // truncate the log
    boolean truncated = snapLog.truncateLog(zxid);
    if (!truncated) {
        return false;
    }
    loadDataBase();
    return true;
}
回滚日志
// org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
public boolean truncateLog(long zxid) throws IOException {
    // close the existing txnLog and snapLog
    close();
    // truncate it
    FileTxnLog truncLog = new FileTxnLog(dataDir);
    boolean truncated = truncLog.truncate(zxid);
    truncLog.close();
    txnLog = new FileTxnLog(dataDir);
    snapLog = new FileSnap(snapDir);
    return truncated;
}
回滚
// org/apache/zookeeper/server/persistence/FileTxnLog.java
public boolean truncate(long zxid) throws IOException {
    FileTxnIterator itr = null;
    try {
        itr = new FileTxnIterator(this.logDir, zxid);
        PositionInputStream input = itr.inputStream;
        // 
        long pos = input.getPosition();
        // now, truncate at the current position
        RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
        raf.setLength(pos);
        raf.close();
        // 下一个日志
        while (itr.goToNextLog()) {
            if (!itr.logFile.delete()) {
                LOG.warn("Unable to truncate {}", itr.logFile);
            }
        }
    } finally {
        close(itr);
    }
    return true;
}