zookeeper 请求处理 - litter-fish/ReadSource GitHub Wiki

会话创建请求

过程流程图: 会话创建

请求接收

  1. I/O层接收来自客户端的请求,NIOServerCnxn 负责维护客户端的连接
// -- NIOServerCnxn
void doIO(SelectionKey k) throws InterruptedException {
    try {
        if (isSocketOpen() == false) {
            LOG.warn("trying to do i/o on a null socket for session:0x"
                     + Long.toHexString(sessionId));

            return;
        }
        if (k.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from client sessionid 0x"
                        + Long.toHexString(sessionId)
                        + ", likely client has closed socket");
            }
            if (incomingBuffer.remaining() == 0) {
                boolean isPayload;
                if (incomingBuffer == lenBuffer) { // start of next request
                    incomingBuffer.flip();
                    isPayload = readLength(k);
                    incomingBuffer.clear();
                } else {
                    // continuation
                    isPayload = true;
                }
                if (isPayload) {
                    readPayload();
                }
                else {
                    return;
                }
            }
        }
        // 省略其他代码
    }
}
  1. 判断是否是客户端“会话创建”请求
// --NIOServerCnxn
private void readPayload() throws IOException, InterruptedException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
        // 省略其他代码
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        packetReceived();
        incomingBuffer.flip();
        // 未初始化这请求时连接请求
        if (!initialized) { 
            // 处理 ConnectRequest 请求
            readConnectRequest();
        } else {
            readRequest();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

处理 ConnectRequest 请求

// --NIOServerCnxn
private void readConnectRequest() throws IOException, InterruptedException {
    if (!isZKServerRunning()) {
        throw new IOException("ZooKeeperServer not running");
    }
    zkServer.processConnectRequest(this, incomingBuffer);
    initialized = true;
}
  1. 反序列化 ConnectRequest 请求
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
// 反序列 ConnectRequest 请求
connReq.deserialize(bia, "connect");
  1. 判断是否是只读, 如果服务端是只读的,非只读客户端将无法连接
boolean readOnly = false;
try {
    // 获取客户端的 readOnly 标识
    readOnly = bia.readBool("readOnly");
    cnxn.isOldClient = false;
} catch (IOException e) {
}
// 如果服务端是非只读启动,则如果客户端以非只读模式连接将拒绝连接
if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
    String msg = "Refusing session request for not-read-only client "
        + cnxn.getRemoteSocketAddress();
    LOG.info(msg);
    throw new CloseRequestException(msg);
}
  1. 检测客户端zxid,服务端的 zxid 一定大于 客户端的 zxid
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
    String msg = "Refusing session request for client "
        + cnxn.getRemoteSocketAddress()
        + " as it has seen zxid 0x"
        + Long.toHexString(connReq.getLastZxidSeen())
        + " our last zxid is 0x"
        + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
        + " client must try another server";

    LOG.info(msg);
    throw new CloseRequestException(msg);
}
  1. 协商 sessionTimeout,获取客户端连接是传入的超时时间,通过与服务端的超时时间限制比较,确保客户端超时时间在,[2 * tickTime - 20 * tickTime] 之间
// 协商 sessionTimeout,并设置cnxn超时时间
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
    sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
    sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
  1. 获取客户端连接的sessionID, 判断是否需要重连会话,否则创建会话
// 获取客户端连接的sessionID,如果sessionID等于0,则表示是一个新的连接,否则进行重连
long sessionId = connReq.getSessionId();
if (sessionId != 0) {
    long clientSessionId = connReq.getSessionId();
    LOG.info("Client attempting to renew session 0x"
            + Long.toHexString(clientSessionId)
            + " at " + cnxn.getRemoteSocketAddress());
    serverCnxnFactory.closeSession(sessionId);
    cnxn.setSessionId(sessionId);
    // 重新连接
    reopenSession(cnxn, sessionId, passwd, sessionTimeout);
} else {
    LOG.info("Client attempting to establish new session at "
            + cnxn.getRemoteSocketAddress());
    // 会话创建
    createSession(cnxn, passwd, sessionTimeout);
}

完整代码如下:

// -- ZooKeeperServer
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    ConnectRequest connReq = new ConnectRequest();
    // 反序列 ConnectRequest 请求
    connReq.deserialize(bia, "connect");
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request from client "
                + cnxn.getRemoteSocketAddress()
                + " client's lastZxid is 0x"
                + Long.toHexString(connReq.getLastZxidSeen()));
    }
    boolean readOnly = false;
    try {
        // 获取客户端的 readOnly 标识
        readOnly = bia.readBool("readOnly");
        cnxn.isOldClient = false;
    } catch (IOException e) {
    }
    // 如果服务端是非只读启动,则如果客户端以非只读模式连接将拒绝连接
    if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
        String msg = "Refusing session request for not-read-only client "
            + cnxn.getRemoteSocketAddress();
        LOG.info(msg);
        throw new CloseRequestException(msg);
    }
    // 检查客户端的zxid服务端的zxid,不允许客户端的zxid大于服务端的zxid的情况出现
    if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        String msg = "Refusing session request for client "
            + cnxn.getRemoteSocketAddress()
            + " as it has seen zxid 0x"
            + Long.toHexString(connReq.getLastZxidSeen())
            + " our last zxid is 0x"
            + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
            + " client must try another server";

        LOG.info(msg);
        throw new CloseRequestException(msg);
    }

    // 协商 sessionTimeout,并设置cnxn超时时间
    int sessionTimeout = connReq.getTimeOut();
    byte passwd[] = connReq.getPasswd();
    int minSessionTimeout = getMinSessionTimeout();
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout();
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout);


    // We don't want to receive any packets until we are sure that the
    // session is setup
    cnxn.disableRecv();
    // 获取客户端连接的sessionID,如果sessionID等于0,则表示是一个新的连接,否则进行重连
    long sessionId = connReq.getSessionId();
    if (sessionId != 0) {
        long clientSessionId = connReq.getSessionId();
        LOG.info("Client attempting to renew session 0x"
                + Long.toHexString(clientSessionId)
                + " at " + cnxn.getRemoteSocketAddress());
        serverCnxnFactory.closeSession(sessionId);
        cnxn.setSessionId(sessionId);
        // 重新连接
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
    } else {
        LOG.info("Client attempting to establish new session at "
                + cnxn.getRemoteSocketAddress());
        // 会话创建
        createSession(cnxn, passwd, sessionTimeout);
    }
}

会话创建

  1. 为客户端生成 sessionID 每个Zookeeper服务器启动的时候,都会初始化一个会话管理器(SessionTracker),同时初始化一个sessionID,针对每一个客户端,只需要在这个sessionID的基础上进行递增即可。
// --ZooKeeperServer
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    // 8. 为客户端生成 sessionID
    long sessionId = sessionTracker.createSession(timeout);

    // 省略其他代码
}

SessionID必须保证整个集群中唯一,因此使用关键字synchronized限制多线程的访问顺序。

// -- SessionTrackerImpl
synchronized public long createSession(int sessionTimeout) {
    addSession(nextSessionId, sessionTimeout);
    return nextSessionId++;
}
  1. 注册会话
  2. 会话激活
// --SessionTrackerImpl
synchronized public void addSession(long id, int sessionTimeout) {
    // 9. 注册会话
    // sessionsWithTimeout 根据sessionID保存所有会话超时时间
    sessionsWithTimeout.put(id, sessionTimeout);
    if (sessionsById.get(id) == null) {
        // 创建会话实例
        SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
        // sessionsById 根据sessionID保存所有会话实例
        sessionsById.put(id, s);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "SessionTrackerImpl --- Adding session 0x"
                    + Long.toHexString(id) + " " + sessionTimeout);
        }
    } else {
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "SessionTrackerImpl --- Existing session 0x"
                    + Long.toHexString(id) + " " + sessionTimeout);
        }
    }
    // 10. 会话激活
    touchSession(id, sessionTimeout);
}
  1. 生成会话密码
// --ZooKeeperServer
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    // 省略其他代码

    // 11. 生成会话密码
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);

    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);

    // 12. 将会话创建请教交给 PrepRequestProcessor 处理
    submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
    return sessionId;
}

预处理

  1. 将会话创建请教交给 PrepRequestProcessor 处理
// --ZooKeeperServer
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
        int xid, ByteBuffer bb, List<Id> authInfo) {
    // 构造请求
    Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
    submitRequest(si);
}
// --ZooKeeperServer
public void submitRequest(Request si) {
    if (firstProcessor == null) {
        // 省略其他代码
    }
    try {
    	// 会话再次激活
        touch(si.cnxn);
        // 验证请求类型的合法性
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            // 将请求交给 PrepRequestProcessor 处理
            // PrepRequestProcessor 是一个独立的线程,请求提交后会加入 submittedRequests 阻塞队列中,线程中获取阻塞队列的请求进行处理
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dropping request: " + e.getMessage());
        }
    } catch (RequestProcessorException e) {
        LOG.error("Unable to process request:" + e.getMessage(), e);
    }
}

加入 submittedRequests 阻塞队列,等待线程处理请求

public void processRequest(Request request) {
    // 加入 submittedRequests 阻塞队列,等待线程处理请求
    submittedRequests.add(request);
}

PrepRequestProcessor 处理器处理请求

public void run() {
    try {
        while (true) {
            // 阻塞取队列元素,队列为空时将会一直阻塞
            Request request = submittedRequests.take();

            // 省略其他无关代码

            // 处理请求
            pRequest(request);
        }
    } catch (RequestProcessorException e) {
        if (e.getCause() instanceof XidRolloverException) {
            LOG.info(e.getCause().getMessage());
        }
        handleException(this.getName(), e);
    } catch (Exception e) {
        handleException(this.getName(), e);
    }
    LOG.info("PrepRequestProcessor exited loop!");
}

根据不同类型的请求创建不同的请求

protected void pRequest(Request request) throws RequestProcessorException {
    request.hdr = null;
    request.txn = null;

    try {
        // 根据不同类型的请求创建不同的请求
        switch (request.type) {

            // 省略其他类型

        case OpCode.createSession:
        case OpCode.closeSession:
            // 创建session或关闭session
            pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
            break;

        // 省略其他类型

    } catch (KeeperException e) {
    } catch (Exception e) {
    }
    request.zxid = zks.getZxid();
    // 转交给下一个处理器
    nextProcessor.processRequest(request);
}
  1. 创建请求事务头
// 事务请求头对象
public class TxnHeader implements Record {
	// 请求客户端ID,标识请求所属客户端
	private long clientId;
	// 客户端操作的序列号
	private int cxid;
	// 事务请求对应的事务ZXID
	private long zxid;
	// 服务器开始处理请求的时间
	private long time;
	// 事务请求的类型
	private int type;
}
// --PrepRequestProcessor
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
    throws KeeperException, IOException, RequestProcessorException
{
    // 创建请求事务头
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                zks.getTime(), type);

    // 省略其他代码
}
  1. 创建请求事务体
  2. 注册与激活会话
// --PrepRequestProcessor.java
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
    throws KeeperException, IOException, RequestProcessorException
{
    // 创建请求头
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                Time.currentWallTime(), type);

    switch (type) {
        // 省略其他事物类型的处理
        case OpCode.createSession:
            request.request.rewind();
            int to = request.request.getInt();
            // 创建请求事务处理体
            request.txn = new CreateSessionTxn(to);
            request.request.rewind();
            // 对于那些非Leader服务器转发过来的请求进行session注册和激活操作
            zks.sessionTracker.addSession(request.sessionId, to);
            zks.setOwner(request.sessionId, request.getOwner());
            break;
        // 省略其他事物类型的处理
    }
}

事务处理

  1. 将请求交给 ProposalRequestProcessor 处理器,请求会进入三个子流程进行处理:Sync流程,Proposal流程,commit流程 处理流程: 事务处理
// --ProposalRequestProcessor.java
public void processRequest(Request request) throws RequestProcessorException {
    if(request instanceof LearnerSyncRequest){
        zks.getLeader().processSync((LearnerSyncRequest)request);
    } else {
        // 将请求交给  CommitProcessor 处理器
        nextProcessor.processRequest(request);
        if (request.hdr != null) { // 事务请求处理器的额外处理逻辑
            // We need to sync and get consensus on any transactions
            try {
                // 交给leader 生成提议,并发送给其他服务器
                zks.getLeader().propose(request);
            } catch (XidRolloverException e) {
                throw new RequestProcessorException(e.getMessage(), e);
            }
            // 进行事务日志记录
            syncProcessor.processRequest(request);
        }
    }
}

Proposal流程 对于事务请求都需要得到过半服务器的同一才能应用到内存数据库中

  1. 发起投票
// --ProposalRequestProcessor.java
public void processRequest(Request request) throws RequestProcessorException {

            // ........

            // 交给leader 生成提议,并发送给其他服务器
            zks.getLeader().propose(request);
            
            // ........
        }
    }
}
  1. 生成提议 Proposal
public Proposal propose(Request request) throws XidRolloverException {
    /**
     * Address the rollover issue. All lower 32bits set indicate a new leader
     * election. Force a re-election instead. See ZOOKEEPER-1277
     */
    if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
        String msg =
                "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
        shutdown(msg);
        throw new XidRolloverException(msg);
    }

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
    try {
        // 序列化请求头
        request.hdr.serialize(boa, "hdr");
        if (request.txn != null) {
            // 序列化请求体
            request.txn.serialize(boa, "txn");
        }
        baos.close();
    } catch (IOException e) {
        LOG.warn("This really should be impossible", e);
    }
    // 创建 PROPOSAL 对象
    QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
            baos.toByteArray(), null);

    Proposal p = new Proposal();
    p.packet = pp;
    p.request = request;
    synchronized (this) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Proposing:: " + request);
        }

        lastProposed = p.packet.getZxid();
        // 以ZXID为key,将该提议放入投票箱中
        outstandingProposals.put(lastProposed, p);
        // 发送提议
        sendPacket(pp);
    }
    return p;
}
  1. 广播提议
void sendPacket(QuorumPacket qp) {
    synchronized (forwardingFollowers) {
        // 循环处理每一个Follower服务器
        for (LearnerHandler f : forwardingFollowers) {
            f.queuePacket(qp);
        }
    }
}
  1. 收集提议 Follower服务器接收到Leader来的提议后,会进入Sync流程来进行事务日志记录,一旦记录完成,则会发送一个ACK消息给Leader, Leader会统计ACK消息是否过半来广播commit。

Follower 服务器的 SendAckRequestProcessor 处理器反馈ACK消息

public void processRequest(Request si) {
    if(si.type != OpCode.sync){
        // 构造ACK消息
        QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null, null);
        try {
            // 发送ACK消息给Leader服务器
            learner.writePacket(qp, false);
        } catch (IOException e) {
        }
    }
}

Leader收集投票 LearnerHandler用于负责Leader与Learner服务器之间的交互

public void run() {
    try {
        // 省略其他逻辑

        while (true) {
            qp = new QuorumPacket();
            // 读阻塞,等待Learner 反馈 UPTODATE 指令的确认 ACK
            ia.readRecord(qp, "packet");

            ......

            switch (qp.getType()) {
            case Leader.ACK:
                if (this.learnerType == LearnerType.OBSERVER) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received ACK from Observer  " + this.sid);
                    }
                }
                syncLimitCheck.updateAck(qp.getZxid());
                // Leader收集ACK信息
                leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                break;
            //.....
        }
    } catch (IOException e) {
    } catch (InterruptedException e) {
    } finally {
    }
}

收集提议反馈,判断是否过半

// -- Leader 收集提议反馈,判断是否过半
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
    if (LOG.isTraceEnabled()) {
        LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
        for (Proposal p : outstandingProposals.values()) {
            long packetZxid = p.packet.getZxid();
            LOG.trace("outstanding proposal: 0x{}",
                    Long.toHexString(packetZxid));
        }
        LOG.trace("outstanding proposals all");
    }

    if ((zxid & 0xffffffffL) == 0) {
        return;
    }

    if (outstandingProposals.size() == 0) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("outstanding is 0");
        }
        return;
    }
    // 判断是否已经有zxid更大的提议已经被提交
    if (lastCommitted >= zxid) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
                    Long.toHexString(lastCommitted), Long.toHexString(zxid));
        }
        // The proposal has already been committed
        return;
    }
    // 根据zxid从投票箱提取提议
    Proposal p = outstandingProposals.get(zxid);
    if (p == null) {
        LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
                Long.toHexString(zxid), followerAddr);
        return;
    }

    p.ackSet.add(sid);
    if (LOG.isDebugEnabled()) {
        LOG.debug("Count for zxid: 0x{} is {}",
                Long.toHexString(zxid), p.ackSet.size());
    }
    // 过半服务器反馈提议判断
    if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
        if (zxid != lastCommitted+1) {
            LOG.warn("Commiting zxid 0x{} from {} not first!",
                    Long.toHexString(zxid), followerAddr);
            LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
        }
        // 移除投票箱提议
        outstandingProposals.remove(zxid);
        if (p.request != null) {
            // 加入toBeApplied
            toBeApplied.add(p);
        }

        if (p.request == null) {
            LOG.warn("Going to commmit null request for proposal: {}", p);
        }
        // 过半则发送一个Commit命令
        commit(zxid);
        // 对于Observe 没有参与投票,不能直接发送zxid
        inform(p);
        // 加入请求到 Leader CommitProcessor 处理器
        zk.commitProcessor.commit(p.request);
        if(pendingSyncs.containsKey(zxid)){
            for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                sendSync(r);
            }
        }
    }
}
  1. 将请求放入 toBeApplied 队列
 // 移除投票箱提议
outstandingProposals.remove(zxid);
if (p.request != null) {
    // 加入toBeApplied
    toBeApplied.add(p);
}
  1. 广播Commit消息 follower的广播
public void commit(long zxid) {
    synchronized(this){
        lastCommitted = zxid;
    }
    // 创建COMMIT消息
    QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
    // 广播COMMIT消息
    sendPacket(qp);
}

observers的广播

public void inform(Proposal proposal) {   
    QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, 
                                        proposal.packet.getData(), null);
    sendObserverPacket(qp);
}

Commit 流程

  1. 将请求交给 CommitProcessor 处理器处理,将事务放到 queuedRequests 队列中
// --CommitProcessor.java
synchronized public void processRequest(Request request) {
    if (!finished) {
        // 将请求放到 queuedRequests 队列中
        queuedRequests.add(request);
        // 唤醒线程进行事务处理
        notifyAll();
    }
}
  1. 处理 queuedRequests 队列,CommitProcessor 处理器使用一个独立的线程处理上一步交给的请求

  2. 标记nextPending,作用:

  • 确保事务请求的顺序性
  • 便于 CommitProcessor 处理器检测集群中有投票的事务正在处理
public void run() {
    try {
        Request nextPending = null;
        while (!finished) {

            // .............................

            // 部分4: 只要不存在pend住的事务请求并且请求队列不为空,
            // 一直遍历请求队列直到出现第一个事务请求或者队列遍历完,其间所有非事务请求全部加入toProcess队列,代表可以直接交给下一个处理器处理的
            synchronized (this) {
                // Process the next requests in the queuedRequests
                while (nextPending == null && queuedRequests.size() > 0) {
                    Request request = queuedRequests.remove();
                    switch (request.type) {
                    case OpCode.create:
                    case OpCode.delete:
                    case OpCode.setData:
                    case OpCode.multi:
                    case OpCode.setACL:
                    case OpCode.createSession:
                    case OpCode.closeSession:
                        // 事务请求直接赋给nextPending,然后break
                        nextPending = request;
                        break;
                    case OpCode.sync:
                        // 如果需要等leader返回,该值learner端为true
                        if (matchSyncs) {
                            nextPending = request;
                        } else {
                            // 不需要的话,直接加入待处理队列里
                            toProcess.add(request);
                        }
                        // leader端matchSyncs是false,learner端才需要等leader回复,这里也break
                        break;
                    default:
                        // 非事务请求,都直接加入待处理队列
                        toProcess.add(request);
                    }
                }
            }
        }
    } catch (InterruptedException e) {
        LOG.warn("Interrupted exception while waiting", e);
    } catch (Throwable e) {
        LOG.error("Unexpected exception causing CommitProcessor to exit", e);
    }
    LOG.info("CommitProcessor exited loop!");
}
  1. 等待 Proposal 投票
public void run() {
    try {
        Request nextPending = null;
        while (!finished) {

            // ................................

            /**
             * 在请求队列remove干净或者找到了事务请求的情况下,
             * 如果没有提交的请求,就等待。
             * 如果有提交的请求,取出来,看和之前记录的下一个pend的请求是否match。
             *   match的话,进入toProcess队列,nextPending置空
             *   不match的话,(基本上是nextPending为null,不会出现不为null且不匹配的情况),进入toProcess处理
             */
            // 队列清空
            toProcess.clear();
            synchronized (this) {
                // 要么 请求队列remove干净,要么从中找到一个事务请求,赋值给nextPending, 不允许size>0且nextPending == null的情况
                if ((queuedRequests.size() == 0 || nextPending != null)
                        && committedRequests.size() == 0) { // 没有已提交事务
                    // 处理链刚刚启动完成或没哟待处理请求是阻塞在这里
                    wait();
                    continue;
                }
                // .......
            }

            // .......
            }
        }
    } catch (InterruptedException e) {
        LOG.warn("Interrupted exception while waiting", e);
    } catch (Throwable e) {
        LOG.error("Unexpected exception causing CommitProcessor to exit", e);
    }
    LOG.info("CommitProcessor exited loop!");
}
  1. 投票通过

  2. 提交请求

public void run() {
    try {
        Request nextPending = null;
        while (!finished) {

            /**
             * 在请求队列remove干净或者找到了事务请求的情况下,
             * 如果没有提交的请求,就等待。
             * 如果有提交的请求,取出来,看和之前记录的下一个pend的请求是否match。
             *   match的话,进入toProcess队列,nextPending置空
             *   不match的话,(基本上是nextPending为null,不会出现不为null且不匹配的情况),进入toProcess处理
             */
            // 队列清空
            toProcess.clear();
            synchronized (this) {
                // 要么 请求队列remove干净,要么从中找到一个事务请求,赋值给nextPending, 不允许size>0且nextPending == null的情况
               
                // 不允许size>0且nextPending == null的情况
                if ((queuedRequests.size() == 0 || nextPending != null)
                        && committedRequests.size() > 0) { // 如果有 已提交的请求
                    Request r = committedRequests.remove();
                    if (nextPending != null
                            && nextPending.sessionId == r.sessionId
                            && nextPending.cxid == r.cxid) { // 如果和nextPending匹配
                        nextPending.hdr = r.hdr;
                        nextPending.txn = r.txn;
                        nextPending.zxid = r.zxid;
                        // 加入待处理队列
                        toProcess.add(nextPending);
                        // 下一个pend的请求清空
                        nextPending = null;
                    } else {
                        // this request came from someone else so just
                        // send the commit packet
                        // 这种情况是nextPending还没有来的及设置,nextPending==null的情况(代码应该再细分一下if else),不可能出现nextPending!=null而走到了这里的情况(算异常)
                        toProcess.add(r);
                    }
                }
            }

        }
    } catch (InterruptedException e) {
        LOG.warn("Interrupted exception while waiting", e);
    } catch (Throwable e) {
        LOG.error("Unexpected exception causing CommitProcessor to exit", e);
    }
    LOG.info("CommitProcessor exited loop!");
}

Sync流程 主要是 SyncRequestProcessor 处理器的处理操作,记录事务的日志,完成后会向leader反馈 ack 消息 Proposal流程,每个事务需要过半机器的通过,才能真正应用到内存数据库中,这个投票与统计过程

事务应用

  1. 请求交给FinalRequestProcessor 处理器
  2. 应用事务到内存数据库中
/**
 * 应用事务到内存数据库
 * @param hdr 事务头
 * @param txn 事务体
 * @return
 */
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    ProcessTxnResult rc;
    int opCode = hdr.getType();
    long sessionId = hdr.getClientId();
    // 应用事务到内存
    rc = getZKDatabase().processTxn(hdr, txn);
    if (opCode == OpCode.createSession) {
        if (txn instanceof CreateSessionTxn) {
            CreateSessionTxn cst = (CreateSessionTxn) txn;
            // 再次进行会话注册
            sessionTracker.addSession(sessionId, cst
                    .getTimeOut());
        } else {
            LOG.warn("*****>>>>> Got "
                    + txn.getClass() + " "
                    + txn.toString());
        }
    } else if (opCode == OpCode.closeSession) {
        // 移除会话
        sessionTracker.removeSession(sessionId);
    }
    return rc;
}
  1. 将事务请求加入 committedLog 队列中,以便集群之间进行数据同步
// --FinalRequestProcessor.java
public void processRequest(Request request) {
    // 省略其他代码

    ProcessTxnResult rc = null;
    synchronized (zks.outstandingChanges) {
        // 检查 outstandingChanges 队列中请求的有效性
        while (!zks.outstandingChanges.isEmpty()
                && zks.outstandingChanges.get(0).zxid <= request.zxid) {
            ChangeRecord cr = zks.outstandingChanges.remove(0);
            if (cr.zxid < request.zxid) {
                LOG.warn("Zxid outstanding "
                        + cr.zxid
                        + " is less than current " + request.zxid);
            }
            if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                zks.outstandingChangesForPath.remove(cr.path);
            }
        }
        if (request.hdr != null) {
           TxnHeader hdr = request.hdr;
           Record txn = request.txn;
           // 应用事务到内存数据库中
           rc = zks.processTxn(hdr, txn);
        }
        // do not add non quorum packets to the queue.
        if (Request.isQuorum(request.type)) {
            // 将事务请求加入 committedLog 队列中,以便集群之间进行数据同步
            zks.getZKDatabase().addCommittedProposal(request);
        }
    }

    // 省略其他代码
}

会话响应

  1. 统计处理
// --FinalRequestProcessor.java
public void processRequest(Request request) {
    // 省略其他代码
    try {
        // 省略其他代码
        switch (request.type) {
        // 省略其他代码
        case OpCode.createSession: {
            // 统计处理
            // 更新最后一次请求花费的时间
            zks.serverStats().updateLatency(request.createTime);

            lastOp = "SESS";
            cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                    request.createTime, System.currentTimeMillis());

            zks.finishSessionInit(request.cnxn, true);
            return;
        }
        // 省略其他代码
    }
}
  1. 创建 ConnectResponse
  2. 序列化 ConnectResponse
  3. I/O层发送响应给客户端
// --ZooKeeperServer.java
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
    // register with JMX
    try {
        if (valid) {
            serverCnxnFactory.registerConnection(cnxn);
        }
    } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
    }

    try {
    	// 创建 ConnectResponse
        ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                        // longer valid
                        valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
        // 序列化 ConnectResponse
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
        bos.writeInt(-1, "len");
        rsp.serialize(bos, "connect");
        if (!cnxn.isOldClient) {
            bos.writeBool(
                    this instanceof ReadOnlyZooKeeperServer, "readOnly");
        }
        baos.close();
        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
        bb.putInt(bb.remaining() - 4).rewind();
        // I/O层发送响应给客户端
        cnxn.sendBuffer(bb);    

        // 省略其他代码
            
    } catch (Exception e) {
        LOG.warn("Exception while establishing session, closing", e);
        cnxn.close();
    }
}

SetData 请求

过程图: SetData 请求

预处理

  1. I/O层接收来自客户端的请求,NIOServerCnxn 负责维护客户端的连接

  2. 判断是否是客户端“会话创建”请求

// --NIOServerCnxn.java
private void readPayload() throws IOException, InterruptedException {
    
	// 省略其他代码

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        packetReceived();
        incomingBuffer.flip();
        if (!initialized) { // 2. 判断是否是客户端“会话创建”请求
            // 3. 反序列化 ConnectRequest 请求
            readConnectRequest();
        } else {
            // 2. 非创建会话请求
            readRequest();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

// --NIOServerCnxn.java
private void readRequest() throws IOException {
	// 读取客户端的请求
    zkServer.processPacket(this, incomingBuffer);
}

读取客户端请求

// -- ZooKeeperServer.java
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    // We have the request, now process and setup for next
    InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    RequestHeader h = new RequestHeader();
    h.deserialize(bia, "header");
    // Through the magic of byte buffers, txn will not be
    // pointing
    // to the start of the txn
    incomingBuffer = incomingBuffer.slice();
    if (h.getType() == OpCode.auth) {
       // 省略其他代码
    } else {
        if (h.getType() == OpCode.sasl) {
            // 省略其他代码
        }
        else {
            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
              h.getType(), incomingBuffer, cnxn.getAuthInfo());
            si.setOwner(ServerCnxn.me);
            // 提交请求给服务器处理链
            submitRequest(si);
        }
    }
    cnxn.incrOutstandingRequests(h);
}
  1. 将会话创建请教交给 PrepRequestProcessor 处理
public void submitRequest(Request si) {
    // 第一个处理器是否已经初始化完成
    if (firstProcessor == null) {
        synchronized (this) {
            try {
                // Since all requests are passed to the request
                // processor it should wait for setting up the request
                // processor chain. The state will be updated to RUNNING
                // after the setup.
                while (state == State.INITIAL) {
                    wait(1000);
                }
            } catch (InterruptedException e) {
                LOG.warn("Unexpected interruption", e);
            }
            if (firstProcessor == null || state != State.RUNNING) {
                throw new RuntimeException("Not started");
            }
        }
    }
    try {
        // 会话再次激活
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            // 请求交给第一个处理器处理
            // PrepRequestProcessor 是一个独立的线程,请求提交后会加入 submittedRequests 阻塞队列中,线程中获取阻塞队列的请求进行处理
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dropping request: " + e.getMessage());
        }
    } catch (RequestProcessorException e) {
        LOG.error("Unable to process request:" + e.getMessage(), e);
    }
}
  1. 创建请求事务头

  2. 会话检查

// -- SessionTrackerImpl.java
synchronized public void checkSession(long sessionId, Object owner)
        throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
    SessionImpl session = sessionsById.get(sessionId);
    // 会话超时或关闭抛出SessionExpiredException异常
    if (session == null || session.isClosing()) {
        throw new KeeperException.SessionExpiredException();
    }
    if (session.owner == null) {
        session.owner = owner;
    // 会话是否已经进行迁移   
    } else if (session.owner != owner) {
        throw new KeeperException.SessionMovedException();
    }
}
  1. 反序列化请求,创建 SetDataRequest 实例,生成 ChangeRecord 对象,最终将被保存到 outstandingChanges 队列中
SetDataRequest setDataRequest = (SetDataRequest)record;
if(deserialize)
    // 反序列化请求
    ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
  1. ACL 检查
static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm,
        List<Id> ids) throws KeeperException.NoAuthException {
    if (skipACL) {
        return;
    }
    if (acl == null || acl.size() == 0) {
        return;
    }
    for (Id authId : ids) {
        if (authId.getScheme().equals("super")) {
            return;
        }
    }
    for (ACL a : acl) {
        Id id = a.getId();
        if ((a.getPerms() & perm) != 0) {
            if (id.getScheme().equals("world")
                    && id.getId().equals("anyone")) {
                return;
            }
            AuthenticationProvider ap = ProviderRegistry.getProvider(id
                    .getScheme());
            if (ap != null) {
                for (Id authId : ids) {
                    if (authId.getScheme().equals(id.getScheme())
                            && ap.matches(authId.getId(), id.getId())) {
                        return;
                    }
                }
            }
        }
    }
    throw new KeeperException.NoAuthException();
}
  1. 悲观锁方式进行数据锁校验
// 数据 version 校验
version = setDataRequest.getVersion();
int currentVersion = nodeRecord.stat.getVersion();
if (version != -1 && version != currentVersion) {
    throw new KeeperException.BadVersionException(path);
}
  1. 创建请求事务体 SetDataTxn
public class SetDataTxn implements Record {
  private String path;
  private byte[] data;
  private int version;
  public SetDataTxn() {
  }
  public SetDataTxn(
        String path,
        byte[] data,
        int version) {
    this.path=path;
    this.data=data;
    this.version=version;
  }
  // .......
}
  1. 保存 ChangeRecord 对象到 outstandingChanges 队列中
void addChangeRecord(ChangeRecord c) {
    synchronized (zks.outstandingChanges) {
        zks.outstandingChanges.add(c);
        zks.outstandingChangesForPath.put(c.path, c);
    }
}

整体源码:

// --PrepRequestProcessor.java
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
    throws KeeperException, IOException, RequestProcessorException
{
    // 创建请求事务头
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                zks.getTime(), type);

    switch (type) {
        // 省略其他代码
        case OpCode.setData:
            // 会话检查
            zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            SetDataRequest setDataRequest = (SetDataRequest)record;
            if(deserialize) // 反序列化请求,创建 SetDataRequest 实例
                ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
            path = setDataRequest.getPath();
            validatePath(path, request.sessionId);
            nodeRecord = getRecordForPath(path);
            // ACL 检查
            checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
                    request.authInfo);

            // 悲观锁方式进行数据锁校验
            version = setDataRequest.getVersion();
            int currentVersion = nodeRecord.stat.getVersion();
            if (version != -1 && version != currentVersion) {
                throw new KeeperException.BadVersionException(path);
            }
            
            version = currentVersion + 1;
            // 创建请求事务体 SetDataTxn
            request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
            nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
            nodeRecord.stat.setVersion(version);
            // 保存事务到 outstandingChanges 队列中
            addChangeRecord(nodeRecord);
            break;
        // 省略其他代码
    }
}

事务处理

同上,事务请求交给 ProposalRequestProcessor 处理器,随后会进入三个子流程进行处理:Sync流程,Proposal流程,commit流程

事务应用

  1. 将请求交给 FinalRequestProcessor 处理器处理
  2. 应用事务到内存数据库中
  3. 将事务请求加入 committedLog 队列中,以便集群之间进行数据同步

请求响应

  1. 统计处理
  2. 创建响应体 SetDataResponse
  3. 创建响应头
public void processRequest(Request request) {

	// 省略其他代码

	long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
    // 创建响应头
    ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());

    zks.serverStats().updateLatency(request.createTime);
    cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
            request.createTime, System.currentTimeMillis());

    // 省略其他代码
}
  1. 序列化响应
  2. I/O底层发送数据
// -- NIOServerCnxn.java
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        // Make space for length
        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
        try {
            // 序列化响应
            baos.write(fourBytes);
            bos.writeRecord(h, "header");
            if (r != null) {
                bos.writeRecord(r, tag);
            }
            baos.close();
        } catch (IOException e) {
            LOG.error("Error serializing response");
        }

        // I/O底层发送数据
        byte b[] = baos.toByteArray();
        ByteBuffer bb = ByteBuffer.wrap(b);
        bb.putInt(b.length - 4).rewind();
        sendBuffer(bb);
        if (h.getXid() > 0) {
            synchronized(this){
                outstandingRequests--;
            }
            // check throttling
            synchronized (this.factory) {        
                if (zkServer.getInProcess() < outstandingLimit
                        || outstandingRequests < 1) {
                    sk.selector().wakeup();
                    enableRecv();
                }
            }
        }
     } catch(Exception e) {
        LOG.warn("Unexpected exception. Destruction averted.", e);
     }
}

GetData 非事务请求处理

处理过程: GetData 非事务请求处理

预处理

  1. I/O层接收来自客户端的请求,NIOServerCnxn 负责维护客户端的连接
  2. 判断是否是客户端“会话创建”请求
  3. 将会话创建请教交给 PrepRequestProcessor 处理
  4. 会话检查
// --PrepRequestProcessor.java
protected void pRequest(Request request) throws RequestProcessorException {

    request.hdr = null;
    request.txn = null;
    
    try {
        switch (request.type) {
        
        // 省略其他代码

        //All the rest don't need to create a Txn - just verify session
        case OpCode.sync:
        case OpCode.exists:
        case OpCode.getData:
        case OpCode.getACL:
        case OpCode.getChildren:
        case OpCode.getChildren2:
        case OpCode.ping:
        case OpCode.setWatches:
            // 会话检查
            zks.sessionTracker.checkSession(request.sessionId,
                    request.getOwner());
            break;
        }
    } catch (KeeperException e) {
        // 省略其他代码
    } catch (Exception e) {
        // 省略其他代码
    }
    request.zxid = zks.getZxid();
    // 将请求交给下一个处理链处理
    nextProcessor.processRequest(request);
}

非事务处理

  1. 非事务处理器,PrepRequestProcessor 处理器链提交请求后,会先后经过 ProposalRequestProcessor、CommitProcessor、ToBeAppliedRequestProcessor、FinalRequestProcessor等几个处理器,只有处理器 FinalRequestProcessor 才会真正的进行数据的处理,其他处理器只是起到转发的作用。
  2. 反序列化GetDataRequest请求,获取数据节点
  3. 检查ACL
  4. 获取内容及stat,并注册watch
public void processRequest(Request request) {
    // 省略其他代码
    try {
        // 省略其他代码
        switch (request.type) {
	        // 省略其他代码
	        case OpCode.getData: {
	            lastOp = "GETD";
	            GetDataRequest getDataRequest = new GetDataRequest();
	            // 反序列化GetDataRequest请求
	            ByteBufferInputStream.byteBuffer2Record(request.request,
	                    getDataRequest);
	            // 获取数据节点
	            DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
	            if (n == null) {
	                throw new KeeperException.NoNodeException();
	            }
	            // 检查ACL
	            PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
	                    ZooDefs.Perms.READ,
	                    request.authInfo);
	            Stat stat = new Stat();
	            // 获取内容及stat,并注册watch
	            byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
	                    getDataRequest.getWatch() ? cnxn : null); // cnxn 表示一个客户端和服务端的连接
	            rsp = new GetDataResponse(b, stat);
	            break;
	        }
	        // 省略其他代码
    	}
    } catch (SessionMovedException e) {
       // 省略其他代码
    } catch (KeeperException e) {
        err = e.code();
    } catch (Exception e) {
        // 省略其他代码
    }

    long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
    // 创建响应头
    ReplyHeader hdr =
        new ReplyHeader(request.cxid, lastZxid, err.intValue());

    zks.serverStats().updateLatency(request.createTime);
    cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
                request.createTime, System.currentTimeMillis());

    try {
        cnxn.sendResponse(hdr, rsp, "response");
        if (closeSession) {
            cnxn.sendCloseSession();
        }
    } catch (IOException e) {
        LOG.error("FIXMSG",e);
    }
}

ZKDatabase 获取数据

// -- ZKDatabase.java
public byte[] getData(String path, Stat stat, Watcher watcher) 
throws KeeperException.NoNodeException {
    return dataTree.getData(path, stat, watcher);
}

内存树中查询数据,并保存监听信息

// -- DataTree.java
public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    // 从内存中获取节点数据
    DataNode n = nodes.get(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        // 从获取到的节点对象 n 拷贝数据到 stat 中
        n.copyStat(stat);
        // 如果存在对这个节点的监听器,则将监听器以节点维度加入 WatchManage 对象r中
        if (watcher != null) {
            dataWatches.addWatch(path, watcher);
        }
        return n.data;
    }
}

请求响应

  1. 创建响应体 SetDataResponse
  2. 创建响应头
  3. 统计处理
  4. 序列化响应
  5. I/O底层发送数据
⚠️ **GitHub.com Fallback** ⚠️