GetData 非事务请求处理 - litter-fish/ReadSource GitHub Wiki

处理过程:

053a88c66be99280d5d810e8b67344f3.jpeg

预处理

预处理序列图:

7d1209e0d634c35c87f19da5caa5c406.png

  1. I/O层接收来自客户端的请求,NIOServerCnxn 负责维护客户端的连接
  2. 判断是否是客户端“会话创建”请求
// org/apache/zookeeper/server/NIOServerCnxn.java
private void readPayload() throws IOException, InterruptedException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) {
            //
        }
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        // 更新接收包数量
        packetReceived();
        incomingBuffer.flip();
        if (!initialized) {
            readConnectRequest();
        } else {
            readRequest();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

处理非“会话创建”请求

// org/apache/zookeeper/server/NIOServerCnxn.java
private void readRequest() throws IOException {
    zkServer.processPacket(this, incomingBuffer);
}

委托ZooKeeperServer处理请求包

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    // We have the request, now process and setup for next
    InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    RequestHeader h = new RequestHeader();
    // 反序列化请求头
    h.deserialize(bia, "header");
    incomingBuffer = incomingBuffer.slice();
    if (h.getType() == OpCode.auth) {
        //
        return;
    } else {
        if (h.getType() == OpCode.sasl) {
            //
            return;
        }
        else {
            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
              h.getType(), incomingBuffer, cnxn.getAuthInfo());
            si.setOwner(ServerCnxn.me);
            submitRequest(si);
        }
    }
    cnxn.incrOutstandingRequests(h);
}
  1. 将会话创建请求交给 PrepRequestProcessor 处理
public void submitRequest(Request si) {
    // 第一个处理器是否已经初始化完成
    if (firstProcessor == null) {
        synchronized (this) {
            try {
                while (state == State.INITIAL) {
                    wait(1000);
                }
            } catch (InterruptedException e) {
                LOG.warn("Unexpected interruption", e);
            }
            if (firstProcessor == null || state != State.RUNNING) {
                throw new RuntimeException("Not started");
            }
        }
    }
    try {
        // 会话再次激活
        touch(si.cnxn);
        // 验证请求包合法性
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            // 请求交给第一个处理器处理
            // PrepRequestProcessor 是一个独立的线程,请求提交后会加入 submittedRequests 阻塞队列中,线程中获取阻塞队列的请求进行处理
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } // catch
}
  1. PrepRequestProcessor 处理器的处理

加入阻塞队列:

// org/apache/zookeeper/server/PrepRequestProcessor.java
public void processRequest(Request request) {
    submittedRequests.add(request);
}

PrepRequestProcessor 处理器线程进行处理

// org/apache/zookeeper/server/PrepRequestProcessor.java
public void run() {
    try {
        while (true) {
            // 阻塞取队列元素,队列为空时将会一直阻塞
            Request request = submittedRequests.take();
            long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
            if (request.type == OpCode.ping) {
                traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
            }
            if (Request.requestOfDeath == request) {
                break;
            }
            // 处理请求
            pRequest(request);
        }
    } // catch
}

处理请求

// org/apache/zookeeper/server/PrepRequestProcessor.java
protected void pRequest(Request request) throws RequestProcessorException {

    request.hdr = null;
    request.txn = null;
    
    try {
        switch (request.type) {
        
        // 其他case
        case OpCode.sync:
        case OpCode.exists:
        case OpCode.getData:
        case OpCode.getACL:
        case OpCode.getChildren:
        case OpCode.getChildren2:
        case OpCode.ping:
        case OpCode.setWatches:
            // 会话检查
            zks.sessionTracker.checkSession(request.sessionId,
                    request.getOwner());
            break;
        }
    } // catch
    
    request.zxid = zks.getZxid();
    // 将请求交给下一个处理链处理
    nextProcessor.processRequest(request);
}

会话检查

// org/apache/zookeeper/server/SessionTrackerImpl.java
synchronized public void checkSession(long sessionId, Object owner) 
        throws KeeperException.SessionExpiredException,
        KeeperException.SessionMovedException {
    SessionImpl session = sessionsById.get(sessionId);
    // 会话超时或关闭抛出SessionExpiredException异常
    if (session == null || session.isClosing()) {
        throw new KeeperException.SessionExpiredException();
    }
    if (session.owner == null) {
        session.owner = owner;
    // 会话是否已经进行迁移
    } else if (session.owner != owner) {
        throw new KeeperException.SessionMovedException();
    }
}

非事务处理

非事务请求处理的时序图:

c876f4ac40121422bd8d2ccb907087c0.png

  1. 非事务处理器,PrepRequestProcessor 处理器链提交请求后,会先后经过 ProposalRequestProcessor、 CommitProcessor、ToBeAppliedRequestProcessor、FinalRequestProcessor等几个处理器,只有处理器 FinalRequestProcessor 才会真正的进行数据的处理,其他处理器只是起到转发的作用。

ProposalRequestProcessor 处理器的处理

// org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
public void processRequest(Request request) throws RequestProcessorException {
    
    if(request instanceof LearnerSyncRequest){
        zks.getLeader().processSync((LearnerSyncRequest)request);
    } else {
        // 提交给下一个处理器:CommitProcessor
        nextProcessor.processRequest(request);
        // 省略事务请求处理器的额外处理逻辑
    }
}

CommitProcessor 处理器的处理

synchronized public void processRequest(Request request) {
    if (!finished) {
        // 请求加入阻塞队列
        queuedRequests.add(request);
        // 唤醒CommitProcessor线程等待的线程
        notifyAll();
    }
}

CommitProcessor线程的处理

// org/apache/zookeeper/server/quorum/CommitProcessor.java
public void run() {
    try {
        Request nextPending = null;            
        while (!finished) {

            // 部分1:遍历toProcess队列(非事务请求或者已经提交的事务请求),交给下一个处理器处理,清空
            int len = toProcess.size();
            for (int i = 0; i < len; i++) {
                // 待处理队列交给下个处理器,按顺序处理
                nextProcessor.processRequest(toProcess.get(i));
            }

            // 队列清空
            toProcess.clear();
            // 部分2
            synchronized (this) {
                // 要么 请求队列remove干净,要么从中找到一个事务请求,赋值给nextPending, 不允许size>0且nextPending == null的情况
                if ((queuedRequests.size() == 0 || nextPending != null)
                        && committedRequests.size() == 0) { // 没有已提交事务
                    // 处理链刚刚启动完成或没哟待处理请求是阻塞在这里
                    wait();
                    continue;
                }
                // 不允许size>0且nextPending == null的情况
                if ((queuedRequests.size() == 0 || nextPending != null)
                        && committedRequests.size() > 0) { // 如果有 已提交的请求
                    // 非事务请求不会进入此分支,省略其代码
                }
            }
            // 部分3
            if (nextPending != null) {
                continue;
            }

            // 部分4
            synchronized (this) {
                while (nextPending == null && queuedRequests.size() > 0) {
                    Request request = queuedRequests.remove();
                    switch (request.type) {
                    // case 分支
                    default:
                        // 非事务请求,都直接加入待处理队列
                        toProcess.add(request);
                    }
                }
            }
        }
    } // catch
}

首先线程启动,执行部分1代码,toProcess队列为空, 接着执行部分2代码,toProcess、queuedRequests、committedRequests队列都为空,且非事务nextPending也为null, 此时线程阻塞在部分2代码中,当有一个非事务请求经过前一处理器到达CommitProcessor处理器时,会将请求加入queuedRequests队列中, 同时唤醒阻塞的线程, 经过部分3, 然后经过部分4,在此步骤中非事务请求加入待处理队列中,此时接着第二次循环,遍历toProcess队列,交给下一个处理器处理。

  1. 反序列化GetDataRequest请求,获取数据节点

FinalRequestProcessor处理器处理序列图:

3db97524f2abf139bb5677d3a672b52f.png

public void processRequest(Request request) {
    // 省略前面事务处理及无关代码
    try {
        switch (request.type) {
         // 省略其他case
        case OpCode.getData: {
            lastOp = "GETD";
            // 反序列化 GetDataRequest
            GetDataRequest getDataRequest = new GetDataRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request,
                    getDataRequest);
            // 获取数据节点
            DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }

            // ACL 检查
            PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                    ZooDefs.Perms.READ,
                    request.authInfo);

            Stat stat = new Stat();
            // 获取Stat及注册Watcher
            byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                    getDataRequest.getWatch() ? cnxn : null);
            // 创建响应体
            rsp = new GetDataResponse(b, stat);
            break;
        }
        // 省略其他case
        }
    } // catch

    // 省略请求的响应
}

反序列化请求

// org/apache/zookeeper/server/ByteBufferInputStream.java
static public void byteBuffer2Record(ByteBuffer bb, Record record)
        throws IOException {
    BinaryInputArchive ia;
    ia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
    record.deserialize(ia, "request");
}

反序列化 GetDataRequest

// org/apache/zookeeper/proto/GetDataRequest.java
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
    path=a_.readString("path");
    watch=a_.readBool("watch");
    a_.endRecord(tag);
}

通过路径名称获取数据节点

// org/apache/zookeeper/server/ZKDatabase.java
public DataNode getNode(String path) {
  return dataTree.getNode(path);
}

从内存中获取数据,ZK在启动时候,会依据数据库文件还原数据,并在内存中按照路径的值为key,DataNode值为value, 存储在ConcurrentHashMap集合中,获取数据时,直接从nodes集合中直接返回即可。

public class DataTree {
    // 存放Zookeeper服务器上的所有数据节点
    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();
    public DataNode getNode(String path) {
        return nodes.get(path);
    }
}

检查ACL

// org/apache/zookeeper/server/PrepRequestProcessor.java
static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm,
        List<Id> ids) throws KeeperException.NoAuthException {
    if (skipACL) {
        return;
    }
    if (acl == null || acl.size() == 0) {
        return;
    }
    // 先遍历权限模式集合,如果存在super则直接返回
    for (Id authId : ids) {
        if (authId.getScheme().equals("super")) {
            return;
        }
    }
    // acl为节点的权限集合
    for (ACL a : acl) {
        Id id = a.getId();
        // 检查客户端是否具有权限
        if ((a.getPerms() & perm) != 0) {
            if (id.getScheme().equals("world")
                    && id.getId().equals("anyone")) {
                return;
            }
            AuthenticationProvider ap = ProviderRegistry.getProvider(id
                    .getScheme());
            if (ap != null) {
                for (Id authId : ids) {                        
                    if (authId.getScheme().equals(id.getScheme())
                            && ap.matches(authId.getId(), id.getId())) {
                        return;
                    }
                }
            }
        }
    }
    throw new KeeperException.NoAuthException();
}

通过路径名称获取内容及stat,并注册watch

public byte[] getData(String path, Stat stat, Watcher watcher) 
        throws KeeperException.NoNodeException {
    return dataTree.getData(path, stat, watcher);
}

获取内容及stat,并注册watch

// org/apache/zookeeper/server/DataTree.java
public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    // 获取数据
    DataNode n = nodes.get(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        // 给stat赋值
        n.copyStat(stat);
        if (watcher != null) {
            // 注册监听
            dataWatches.addWatch(path, watcher);
        }
        return n.data;
    }
}

给stat赋值,

// org/apache/zookeeper/server/DataNode.java
synchronized public void copyStat(Stat to) {
    to.setAversion(stat.getAversion());
    to.setCtime(stat.getCtime());
    to.setCzxid(stat.getCzxid());
    to.setMtime(stat.getMtime());
    to.setMzxid(stat.getMzxid());
    to.setPzxid(stat.getPzxid());
    to.setVersion(stat.getVersion());
    to.setEphemeralOwner(stat.getEphemeralOwner());
    to.setDataLength(data == null ? 0 : data.length);
    int numChildren = 0;
    if (this.children != null) {
        numChildren = children.size();
    }
    to.setCversion(stat.getCversion()*2 - numChildren);
    to.setNumChildren(numChildren);
}

请求响应

请求响应的时序图:

65a23b5c18f26c68b29e66aff4f0fdd8.png

  1. 创建响应体 SetDataResponse
public GetDataResponse(
        byte[] data,
        org.apache.zookeeper.data.Stat stat) {
    this.data=data;
    this.stat=stat;
}
  1. 创建响应头
public ReplyHeader(
        int xid,
        long zxid,
        int err) {
    this.xid=xid;
    this.zxid=zxid;
    this.err=err;
}
  1. 统计处理
synchronized void updateLatency(long requestCreateTime) {
    long latency = Time.currentElapsedTime() - requestCreateTime;
    totalLatency += latency;
    count++;
    if (latency < minLatency) {
        minLatency = latency;
    }
    if (latency > maxLatency) {
        maxLatency = latency;
    }
}
  1. 序列化响应
// org/apache/zookeeper/server/NIOServerCnxn.java
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        // Make space for length
        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
        try {
            // 写入四个字节
            baos.write(fourBytes);
            // 写入响应头
            bos.writeRecord(h, "header");
            if (r != null) {
                // 写入响应体
                bos.writeRecord(r, tag);
            }
            baos.close();
        } catch (IOException e) {
            LOG.error("Error serializing response");
        }
        byte b[] = baos.toByteArray();
        // 转换为ByteBuffer
        ByteBuffer bb = ByteBuffer.wrap(b);
        bb.putInt(b.length - 4).rewind();
        // 发送ByteBuffer
        sendBuffer(bb);
        // 
     } catch(Exception e) {
        LOG.warn("Unexpected exception. Destruction averted.", e);
     }
}
  1. I/O底层发送数据
// org/apache/zookeeper/server/NIOServerCnxn.java
protected void internalSendBuffer(ByteBuffer bb) {
    if (bb != ServerCnxnFactory.closeConn) {
        // 
        if(sk.isValid() &&
                ((sk.interestOps() & SelectionKey.OP_WRITE) == 0)) {
            try {
                // 写入数据
                sock.write(bb);
            } catch (IOException e) {
                // we are just doing best effort right now
            }
        }
        // if there is nothing left to send, we are done
        if (bb.remaining() == 0) {
            // 更新发送数量
            packetSent();
            return;
        }
    }
    // 
}
⚠️ **GitHub.com Fallback** ⚠️