SetData 请求处理 - litter-fish/ReadSource GitHub Wiki
过程图:
客户端请求处理时序图:
- I/O层接收来自客户端的请求,NIOServerCnxn 负责维护客户端的连接
- 判断是否是客户端“会话创建”请求
// org/apache/zookeeper/server/NIOServerCnxn.java
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
//
}
}
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
// 更新接收包数量
packetReceived();
incomingBuffer.flip();
if (!initialized) {
readConnectRequest();
} else {
readRequest();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
处理非“会话创建”请求
// org/apache/zookeeper/server/NIOServerCnxn.java
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
委托ZooKeeperServer处理请求包
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
// 反序列化请求头
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
//
return;
} else {
if (h.getType() == OpCode.sasl) {
//
return;
}
else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}
- 将会话创建请求交给 PrepRequestProcessor 处理
public void submitRequest(Request si) {
// 第一个处理器是否已经初始化完成
if (firstProcessor == null) {
synchronized (this) {
try {
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
// 会话再次激活
touch(si.cnxn);
// 验证请求包合法性
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
// 请求交给第一个处理器处理
// PrepRequestProcessor 是一个独立的线程,请求提交后会加入 submittedRequests 阻塞队列中,线程中获取阻塞队列的请求进行处理
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} // catch
}
- PrepRequestProcessor 处理器的处理
加入阻塞队列:
// org/apache/zookeeper/server/PrepRequestProcessor.java
public void processRequest(Request request) {
submittedRequests.add(request);
}
PrepRequestProcessor 处理器线程处理时序图:
PrepRequestProcessor 处理器线程进行处理
// org/apache/zookeeper/server/PrepRequestProcessor.java
public void run() {
try {
while (true) {
// 阻塞取队列元素,队列为空时将会一直阻塞
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
// 处理请求
pRequest(request);
}
} // catch
}
处理请求
// org/apache/zookeeper/server/PrepRequestProcessor.java
protected void pRequest(Request request) throws RequestProcessorException {
request.hdr = null;
request.txn = null;
try {
switch (request.type) {
// 其他case
case OpCode.setData:
// 创建 SetDataRequest 实例
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
// 其他case
}
} // catch
request.zxid = zks.getZxid();
// 将请求交给下一个处理链处理
nextProcessor.processRequest(request);
}
- 事务类型处理
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
// 创建请求头
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) {
// case
case OpCode.setData:
// 会话检查
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetDataRequest setDataRequest = (SetDataRequest)record;
if(deserialize)
// 反序列化请求
ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
// ACL 检查
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
request.authInfo);
// 数据 version 校验
version = setDataRequest.getVersion();
int currentVersion = nodeRecord.stat.getVersion();
if (version != -1 && version != currentVersion) {
throw new KeeperException.BadVersionException(path);
}
// 版本加一
version = currentVersion + 1;
// 创建请求事务体 SetDataTxn
request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
nodeRecord.stat.setVersion(version);
// 保存 ChangeRecord 对象到 outstandingChanges 队列中
addChangeRecord(nodeRecord);
break;
// case
default:
LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type);
}
}
首先创建事务的请求头,接着根据请求数据类型,做不同处理。
创建请求头:
public TxnHeader(
long clientId,
int cxid,
long zxid,
long time,
int type) {
this.clientId=clientId;
this.cxid=cxid;
this.zxid=zxid;
this.time=time;
this.type=type;
}
会话检查,从sessionsById集合(根据sessionID来管理session实体)中获取session,然后判断获取到的对象即可
// org/apache/zookeeper/server/SessionTrackerImpl.java
synchronized public void checkSession(long sessionId, Object owner)
throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
SessionImpl session = sessionsById.get(sessionId);
// 会话超时或关闭抛出SessionExpiredException异常
if (session == null || session.isClosing()) {
throw new KeeperException.SessionExpiredException();
}
if (session.owner == null) {
session.owner = owner;
// 会话是否已经进行迁移
} else if (session.owner != owner) {
throw new KeeperException.SessionMovedException();
}
}
反序列化请求,创建 SetDataRequest 实例,生成 ChangeRecord 对象,最终将被保存到 outstandingChanges 队列中
// org/apache/zookeeper/server/ByteBufferInputStream.java
static public void byteBuffer2Record(ByteBuffer bb, Record record)
throws IOException {
BinaryInputArchive ia;
ia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
record.deserialize(ia, "request");
}
真正反序列化的地方
// org/apache/zookeeper/proto/SetDataRequest.java
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
path=a_.readString("path");
data=a_.readBuffer("data");
version=a_.readInt("version");
a_.endRecord(tag);
}
验证路径的合法性
// org/apache/zookeeper/server/PrepRequestProcessor.java
private void validatePath(String path, long sessionId) throws BadArgumentsException {
try {
PathUtils.validatePath(path);
} catch(IllegalArgumentException ie) {
LOG.info("Invalid path " + path + " with session 0x" + Long.toHexString(sessionId) +
", reason: " + ie.getMessage());
throw new BadArgumentsException(path);
}
}
路径合法性判断,不能为空字符串、且必须以“/”开头、长度必须大于1、
// org/apache/zookeeper/common/PathUtils.java
public static void validatePath(String path) throws IllegalArgumentException {
if (path == null) {
throw new IllegalArgumentException("Path cannot be null");
}
if (path.length() == 0) {
throw new IllegalArgumentException("Path length must be > 0");
}
if (path.charAt(0) != '/') {
throw new IllegalArgumentException(
"Path must start with / character");
}
if (path.length() == 1) { // done checking - it's the root
return;
}
if (path.charAt(path.length() - 1) == '/') {
throw new IllegalArgumentException(
"Path must not end with / character");
}
String reason = null;
char lastc = '/';
char chars[] = path.toCharArray();
char c;
for (int i = 1; i < chars.length; lastc = chars[i], i++) {
c = chars[i];
if (c == 0) {
reason = "null character not allowed @" + i;
break;
} else if (c == '/' && lastc == '/') {
reason = "empty node name specified @" + i;
break;
} else if (c == '.' && lastc == '.') {
if (chars[i-2] == '/' &&
((i + 1 == chars.length)
|| chars[i+1] == '/')) {
reason = "relative paths not allowed @" + i;
break;
}
} else if (c == '.') {
if (chars[i-1] == '/' &&
((i + 1 == chars.length)
|| chars[i+1] == '/')) {
reason = "relative paths not allowed @" + i;
break;
}
} else if (c > '\u0000' && c < '\u001f'
|| c > '\u007f' && c < '\u009F'
|| c > '\ud800' && c < '\uf8ff'
|| c > '\ufff0' && c < '\uffff') {
reason = "invalid charater @" + i;
break;
}
}
if (reason != null) {
throw new IllegalArgumentException(
"Invalid path string \"" + path + "\" caused by " + reason);
}
}
根据节点路径获取节点,此处会判断路径对应节点是否存在
// org/apache/zookeeper/server/PrepRequestProcessor.java
ChangeRecord getRecordForPath(String path) throws KeeperException.NoNodeException {
ChangeRecord lastChange = null;
synchronized (zks.outstandingChanges) {
lastChange = zks.outstandingChangesForPath.get(path);
if (lastChange == null) {
// 根据路径获取节点对象
DataNode n = zks.getZKDatabase().getNode(path);
if (n != null) {
Set<String> children;
synchronized(n) {
children = n.getChildren();
}
lastChange = new ChangeRecord(-1, path, n.stat, children.size(),
zks.getZKDatabase().aclForNode(n));
}
}
}
// 如果节点不存在则抛出节点不存在异常
if (lastChange == null || lastChange.stat == null) {
throw new KeeperException.NoNodeException(path);
}
return lastChange;
}
ACL 检查
// org/apache/zookeeper/server/PrepRequestProcessor.java
static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm,
List<Id> ids) throws KeeperException.NoAuthException {
if (skipACL) {
return;
}
if (acl == null || acl.size() == 0) {
return;
}
// 先遍历权限模式集合,如果存在super则直接返回
for (Id authId : ids) {
if (authId.getScheme().equals("super")) {
return;
}
}
// acl为节点的权限集合
for (ACL a : acl) {
Id id = a.getId();
// 检查客户端是否具有权限
if ((a.getPerms() & perm) != 0) {
if (id.getScheme().equals("world")
&& id.getId().equals("anyone")) {
return;
}
AuthenticationProvider ap = ProviderRegistry.getProvider(id
.getScheme());
if (ap != null) {
for (Id authId : ids) {
if (authId.getScheme().equals(id.getScheme())
&& ap.matches(authId.getId(), id.getId())) {
return;
}
}
}
}
}
throw new KeeperException.NoAuthException();
}
数据 version 校验,获取请求中传入的版本号和内存中数据的版本号,如果传入值不等于-1并且如果该版本号与内存中的值不一样则抛出BadVersionException异常
// 数据 version 校验
version = setDataRequest.getVersion();
int currentVersion = nodeRecord.stat.getVersion();
if (version != -1 && version != currentVersion) {
throw new KeeperException.BadVersionException(path);
}
创建请求事务体 SetDataTxn
public SetDataTxn(
String path,
byte[] data,
int version) {
// 节点路径
this.path=path;
// 节点值
this.data=data;
// 节点版本号
this.version=version;
}
创建节点副本
// org/apache/zookeeper/server/ZooKeeperServer#ChangeRecord
ChangeRecord duplicate(long zxid) {
// 创建节点属性副本
StatPersisted stat = new StatPersisted();
if (this.stat != null) {
// 给节点属性副本赋值
DataTree.copyStatPersisted(this.stat, stat);
}
// 创建节点副本
return new ChangeRecord(zxid, path, stat, childCount,
acl == null ? new ArrayList<ACL>() : new ArrayList<ACL>(acl));
}
保存 ChangeRecord 对象到 outstandingChanges 队列中
void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
}
}
ProposalRequestProcessor 处理时序图:
同上,事务请求交给 ProposalRequestProcessor 处理器,随后会进入三个子流程进行处理:Sync流程,Proposal流程,commit流程
- ProposalRequestProcessor 处理器的处理
// org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
public void processRequest(Request request) throws RequestProcessorException {
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
// 提交给下一个处理器:CommitProcessor
nextProcessor.processRequest(request);
// 事务请求处理器的额外处理逻辑
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
try {
// Leader 服务器发起一个PROPOSAL提议
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// SyncRequestProcessor处理器进行事务日志记录
syncProcessor.processRequest(request);
}
}
}
提交给下一个处理器:CommitProcessor
// org/apache/zookeeper/server/quorum/CommitProcessor.java
synchronized public void processRequest(Request request) {
if (!finished) {
// 请求加入阻塞队列
queuedRequests.add(request);
// 唤醒CommitProcessor线程等待的线程
notifyAll();
}
}
Proposal流程
Leader 服务器发起一个PROPOSAL提议
public Proposal propose(Request request) throws XidRolloverException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
// 序列化请求头
request.hdr.serialize(boa, "hdr");
if (request.txn != null) {
// 序列化请求体
request.txn.serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.warn("This really should be impossible", e);
}
// 创建 PROPOSAL 对象
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}
lastProposed = p.packet.getZxid();
// 以ZXID为key,将该提议放入投票箱中
outstandingProposals.put(lastProposed, p);
// 发送提议
sendPacket(pp);
}
return p;
}
发送提议给所有Follower
// org/apache/zookeeper/server/quorum/Leader.java
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}
Sync流程
Leader或Follower 使用SyncRequestProcessor处理器进行事务日志记录,然后返回Leader一个ACK命令
// org/apache/zookeeper/server/SyncRequestProcessor.java
public void processRequest(Request request) {
queuedRequests.add(request);
}
SyncRequestProcessor 主线程
// --SyncRequestProcessor
public void run() {
try {
int logCount = 0;
// 设置服务器的过半提交值,防止所有服务器在同一时刻进行数据快照
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
// 阻塞出队
si = queuedRequests.take();
} else {
// 返回指定值,如果队列没有请求,返回null
si = queuedRequests.poll();
// 当队列中没有请求的时候,进行事务日志磁盘的刷入操作
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
// 请求添加至日志文件,只有事务性请求才会返回true
if (zks.getZKDatabase().append(si)) {
// 当前已经记录的事务日志数量 + 1
logCount++;
// 采取“随机过半”策略进行快照,避免所有服务器在同一时刻进行数据快照
if (logCount > (snapCount / 2 + randRoll)) {
// 重新设置下一次的随机数
setRandRoll(r.nextInt(snapCount/2));
// 切换事务日志文件
zks.getZKDatabase().rollLog();
// take a snapshot
// 正在进行快照
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
// 新创建并启动一个线程进行快照,将sessions和datatree保存至snapshot文件
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
// 如果是非事务请求(读操作)且toFlush为空
} else if (toFlush.isEmpty()) {
// 对于非事务请求,直接转交给下一个处理器
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
// 当提交的事务请求数量大于1000时进行磁盘刷入操作
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
请求添加至日志文件
// org/apache/zookeeper/server/ZKDatabase.java
public boolean append(Request si) throws IOException {
return this.snapLog.append(si);
}
委托事务管理器追加事务到事务日志文件
// org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
public boolean append(Request si) throws IOException {
return txnLog.append(si.hdr, si.txn);
}
追加事务到事务日志文件
// org/apache/zookeeper/server/persistence/FileTxnLog.java
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
// 非事务返回false
if (hdr == null) {
return false;
}
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
// 是否有日志文件与Zookeeper服务器关联
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
// 创建一个事务日志文件,文件名为:log.{zxid}
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
// 写入文件头:魔数、版本号即dbId
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
// 刷入文件头数据到磁盘中
logStream.flush();
currentSize = fos.getChannel().position();
// 文件流放入 streamsToFlush 集合中,
streamsToFlush.add(fos);
}
// 确定事务文件是否需要扩容
currentSize = padFile(fos.getChannel());
// 事务序列化
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
// 生成 Checksum
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
// 写入 Checksum 事务日志文件流
oa.writeLong(crc.getValue(), "txnEntryCRC");
// 写入事务到日志文件流
Util.writeTxnBytes(oa, buf);
return true;
}
首先判断是否是事务,非事务则返回false,然后判断是否有日志文件与Zookeeper服务器关联,如果没有则,创建一个文件与之关联, 接着确定事务文件是否需要扩容,最后将事务序列化并生成 Checksum,写入事务日志文件流
判断事务文件是否需要扩容
// org/apache/zookeeper/server/persistence/FileTxnLog.java
private long padFile(FileChannel fileChannel) throws IOException {
// 计算新文件的大小,preAllocSize = 64M
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {
// 使用“0”填充其他内容
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
计算新文件的大小,preAllocSize = 65536 * 1024,即64M
// org/apache/zookeeper/server/persistence/FileTxnLog.java
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
// 检测当前文件的可写空间是否不足4096字节
if (preAllocSize > 0 && position + 4096 >= fileSize) {
if (position > fileSize){
fileSize = position + preAllocSize;
fileSize -= fileSize % preAllocSize;
} else {
fileSize += preAllocSize;
}
}
return fileSize;
}
写入事务到日志文件流
// org/apache/zookeeper/server/persistence/Util.java
public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
throws IOException {
// 写入事务内容,标记为:txnEntry
oa.writeBuffer(bytes, "txnEntry");
// 写入结束标记
oa.writeByte((byte) 0x42, "EOR"); // 'B'
}
如果记录的事务数量已经满足过半条件,且没有正在进行快照,则启动一个独立的线程进行事务日志文件的快照操作
// 新创建并启动一个线程进行快照,将sessions和datatree保存至snapshot文件
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
当提交的事务请求数量大于1000时或当队列中没有请求的时,进行磁盘刷入操作
// org/apache/zookeeper/server/SyncRequestProcessor.java
private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException
{
if (toFlush.isEmpty())
return;
// 事务日志刷到磁盘
zks.getZKDatabase().commit();
// 依次将事务请求转交给下一个处理器进行处理
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
if (nextProcessor != null) {
nextProcessor.processRequest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
刷事务日志到磁盘
// org/apache/zookeeper/server/ZKDatabase.java
public void commit() throws IOException {
this.snapLog.commit();
}
委托事务管理器刷入事务日志
// org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
public void commit() throws IOException {
txnLog.commit();
}
刷入事务日志文件到磁盘
// org/apache/zookeeper/server/persistence/FileTxnLog.java
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();
// 强制将文件刷入磁盘
log.getChannel().force(false);
long syncElapsedMS =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
刷新缓冲文件内容
// java/io/BufferedOutputStream.java
public synchronized void flush() throws IOException {
flushBuffer();
out.flush();
}
将事务请求转交给下一个处理器
Leader 服务器的下一个处理器为:AckRequestProcessor:
// org/apache/zookeeper/server/quorum/AckRequestProcessor.java
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null)
// 向Leader反馈ACK
leader.processAck(self.getId(), request.zxid, null);
else
LOG.error("Null QuorumPeer");
}
Leader反馈ACK时不需要进行网络的通信,即直接调用processAck方法。
Follower 服务器的下一个处理器为:SendAckRequestProcessor:
// org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
public void processRequest(Request si) {
if(si.type != OpCode.sync){
// 构造ACK消息
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
null);
try {
// 发送ACK消息给Leader服务器
learner.writePacket(qp, false);
} // catch
}
}
首先构造ACK消息,接着发送ACK消息给Leader服务器
发送ACK消息给Leader服务器
// org/apache/zookeeper/server/quorum/Learner.java
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
synchronized (leaderOs) {
if (pp != null) {
leaderOs.writeRecord(pp, "packet");
}
if (flush) {
bufferedOutput.flush();
}
}
}
当Learner服务器与Leader服务器建立连接时会创建一个LearnerHandler实例,并启动一个线程专门用于处理Leader的选举的消息的通信, 当接收到Learner发送的ACK消息会进行相关信息处理
// org/apache/zookeeper/server/quorum/LearnerHandler.java
public void run() {
try {
//
while (true) {
qp = new QuorumPacket();
// 读阻塞,等待Learner 反馈 UPTODATE 指令的确认 ACK
ia.readRecord(qp, "packet");
//
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
// case
}
}
} // catch
}
ACK消息处理
// org/apache/zookeeper/server/quorum/Leader.java
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
//
if ((zxid & 0xffffffffL) == 0) {
return;
}
if (outstandingProposals.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
}
return;
}
// 判断是否已经有zxid更大的提议已经被提交
if (lastCommitted >= zxid) {
if (LOG.isDebugEnabled()) {
LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted), Long.toHexString(zxid));
}
// The proposal has already been committed
return;
}
// 根据zxid从投票箱提取提议
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
Long.toHexString(zxid), followerAddr);
return;
}
p.ackSet.add(sid);
if (LOG.isDebugEnabled()) {
LOG.debug("Count for zxid: 0x{} is {}",
Long.toHexString(zxid), p.ackSet.size());
}
// 过半服务器反馈提议判断
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
// 移除投票箱提议
outstandingProposals.remove(zxid);
if (p.request != null) {
// 加入toBeApplied
toBeApplied.add(p);
}
if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
// 过半则发送一个Commit命令
commit(zxid);
// 对于Observe 没有参与投票,不能直接发送zxid
inform(p);
// 加入请求到 Leader CommitProcessor 处理器
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
}
首先进行zxid的判断,然后进行过半服务器反馈提议判断,如果超过半数服务器则,移除投票箱提议并将提议加入toBeApplied 然后给Learner发送Commit命令,最后加入请求到 Leader CommitProcessor 处理器
给Follower发送一个Commit命令
// org/apache/zookeeper/server/quorum/Leader.java
public void commit(long zxid) {
synchronized(this){
lastCommitted = zxid;
}
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
}
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}
给Observer发送一个Commit命令
// org/apache/zookeeper/server/quorum/Leader.java
public void inform(Proposal proposal) {
QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,
proposal.packet.getData(), null);
sendObserverPacket(qp);
}
void sendObserverPacket(QuorumPacket qp) {
for (LearnerHandler f : getObservingLearners()) {
f.queuePacket(qp);
}
}
加入请求到 Leader CommitProcessor 处理器
// org/apache/zookeeper/server/quorum/CommitProcessor.java
synchronized public void commit(Request request) {
if (!finished) {
// 加入阻塞队列
committedRequests.add(request);
// 唤醒Commit流程阻塞线程
notifyAll();
}
}
commit流程
Leader服务器在启动时,会初始化处理器链,而CommitProcessor是其一处理器,在启动时会启动一个独立的线程用于处理 已经有过半的服务器反馈的ACK提议。在刚刚初始化时,由于不存在待处理的提议,所以线程将被阻塞。直到当有过半提议被同意 后,将会唤醒Commit流程阻塞的线程。
// org/apache/zookeeper/server/quorum/CommitProcessor.java
public void run() {
try {
Request nextPending = null;
while (!finished) {
// 部分1:遍历toProcess队列(非事务请求或者已经提交的事务请求),交给下一个处理器处理,清空
int len = toProcess.size();
for (int i = 0; i < len; i++) {
// 待处理队列交给下个处理器,按顺序处理
nextProcessor.processRequest(toProcess.get(i));
}
/**
* 在请求队列remove干净或者找到了事务请求的情况下,
* 如果没有提交的请求,就等待。
* 如果有提交的请求,取出来,看和之前记录的下一个pend的请求是否match。
* match的话,进入toProcess队列,nextPending置空
* 不match的话,(基本上是nextPending为null,不会出现不为null且不匹配的情况),进入toProcess处理
*/
// 队列清空
toProcess.clear();
synchronized (this) {
// 要么 请求队列remove干净,要么从中找到一个事务请求,赋值给nextPending, 不允许size>0且nextPending == null的情况
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() == 0) { // 没有已提交事务
// 处理链刚刚启动完成或没哟待处理请求是阻塞在这里
wait();
continue;
}
// 不允许size>0且nextPending == null的情况
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() > 0) { // 如果有 已提交的请求
Request r = committedRequests.remove();
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) { // 如果和nextPending匹配
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
// 加入待处理队列
toProcess.add(nextPending);
// 下一个pend的请求清空
nextPending = null;
} else {
// 这种情况是nextPending还没有来的及设置,nextPending==null的情况(代码应该再细分一下if else),
// 不可能出现nextPending!=null而走到了这里的情况(算异常)
toProcess.add(r);
}
}
}
// 部分3 如果 nextPending非空,就不用再去遍历请求队列,找到下一个事务请求(即4部分),因此continue掉
// 如果还有 未处理的事务请求(不含leader端的sync请求),就continue
if (nextPending != null) {
continue;
}
// 部分4: 只要不存在pend住的事务请求并且请求队列不为空,
// 一直遍历请求队列直到出现第一个事务请求或者队列遍历完,其间所有非事务请求全部加入toProcess队列,代表可以直接交给下一个处理器处理的
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
// 事务请求直接赋给nextPending,然后break
nextPending = request;
break;
case OpCode.sync:
// 如果需要等leader返回,该值learner端为true
if (matchSyncs) {
nextPending = request;
} else {
// 不需要的话,直接加入待处理队列里
toProcess.add(request);
}
// leader端matchSyncs是false,learner端才需要等leader回复,这里也break
break;
default:
// 非事务请求,都直接加入待处理队列
toProcess.add(request);
}
}
}
}
} // catch
LOG.info("CommitProcessor exited loop!");
}
提议通过后将移交给ToBeAppliedRequestProcessor处理器处理
// org/apache/zookeeper/server/quorum/Leader$ToBeAppliedRequestProcessor
public void processRequest(Request request) throws RequestProcessorException {
// 移交下一个处理器处理
next.processRequest(request);
// FinalRequestProcessor 处理器处理完,移除队列中元素
Proposal p = toBeApplied.peek();
if (p != null && p.request != null
&& p.request.zxid == request.zxid) {
toBeApplied.remove();
}
}
- 将请求交给 FinalRequestProcessor 处理器处理
public void processRequest(Request request) {
//
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < request.zxid) {
LOG.warn("Zxid outstanding "
+ cr.zxid
+ " is less than current " + request.zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
// 处理事务请求
rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
// 将事务请求加入 committedLog 队列中
zks.getZKDatabase().addCommittedProposal(request);
}
}
if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
ServerCnxnFactory scxn = zks.getServerCnxnFactory();
if (scxn != null && request.cnxn == null) {
scxn.closeSession(request.sessionId);
return;
}
}
if (request.cnxn == null) {
return;
}
//
}
应用事务到内存数据库中
// org/apache/zookeeper/server/ZooKeeperServer.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
// 应用事务到内存
rc = getZKDatabase().processTxn(hdr, txn);
if (opCode == OpCode.createSession) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
// 再次进行会话注册
sessionTracker.addSession(sessionId, cst
.getTimeOut());
}
} else if (opCode == OpCode.closeSession) {
// 移除会话
sessionTracker.removeSession(sessionId);
}
return rc;
}
应用事务到内存
// org/apache/zookeeper/server/ZKDatabase.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return dataTree.processTxn(hdr, txn);
}
处理事务请求
// org/apache/zookeeper/server/DataTree.java
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
// case
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
// case
}
} // catch
if (rc.zxid > lastProcessedZxid) {
lastProcessedZxid = rc.zxid;
}
//
return rc;
}
将事务请求加入 committedLog 队列中,以便集群之间进行数据同步
// org/apache/zookeeper/server/ZKDatabase.java
public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.removeFirst();
minCommittedLog = committedLog.getFirst().packet.getZxid();
}
if (committedLog.size() == 0) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
request.hdr.serialize(boa, "hdr");
if (request.txn != null) {
request.txn.serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.error("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
committedLog.add(p);
maxCommittedLog = p.packet.getZxid();
} finally {
wl.unlock();
}
}
请求响应处理时序图:
- SetData请求的响应
public void processRequest(Request request) {
//
ServerCnxn cnxn = request.cnxn;
String lastOp = "NA";
zks.decInProcess();
Code err = Code.OK;
Record rsp = null;
boolean closeSession = false;
try {
if (request.hdr != null && request.hdr.getType() == OpCode.error) {
throw KeeperException.create(KeeperException.Code.get((
(ErrorTxn) request.txn).getErr()));
}
KeeperException ke = request.getException();
if (ke != null && request.type != OpCode.multi) {
throw ke;
}
if (LOG.isDebugEnabled()) {
LOG.debug("{}",request);
}
switch (request.type) {
// case
case OpCode.setData: {
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
// case
}
} // catch
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
// 响应头
ReplyHeader hdr =
new ReplyHeader(request.cxid, lastZxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
request.createTime, Time.currentElapsedTime());
try {
cnxn.sendResponse(hdr, rsp, "response");
if (closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
}
统计处理
protected synchronized void updateStatsForResponse(long cxid, long zxid,
String op, long start, long end)
{
// don't overwrite with "special" xids - we're interested
// in the clients last real operation
if (cxid >= 0) {
lastCxid = cxid;
}
lastZxid = zxid;
lastOp = op;
lastResponseTime = end;
long elapsed = end - start;
lastLatency = elapsed;
if (elapsed < minLatency) {
minLatency = elapsed;
}
if (elapsed > maxLatency) {
maxLatency = elapsed;
}
count++;
totalLatency += elapsed;
}
序列化响应
// org/apache/zookeeper/server/NIOServerCnxn.java
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
// 写入四个字节
baos.write(fourBytes);
// 写入响应头
bos.writeRecord(h, "header");
if (r != null) {
// 写入响应体
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
byte b[] = baos.toByteArray();
// 转换为ByteBuffer
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
// 发送ByteBuffer
sendBuffer(bb);
if (h.getXid() > 0) {
synchronized(this){
outstandingRequests--;
}
// check throttling
synchronized (this.factory) {
if (zkServer.getInProcess() < outstandingLimit
|| outstandingRequests < 1) {
sk.selector().wakeup();
enableRecv();
}
}
}
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
}
}
I/O底层发送数据
// org/apache/zookeeper/server/NIOServerCnxn.java
public void sendBuffer(ByteBuffer bb) {
try {
internalSendBuffer(bb);
} catch(Exception e) {
LOG.error("Unexpected Exception: ", e);
}
}
protected void internalSendBuffer(ByteBuffer bb) {
if (bb != ServerCnxnFactory.closeConn) {
if(sk.isValid() &&
((sk.interestOps() & SelectionKey.OP_WRITE) == 0)) {
try {
// 写入数据
sock.write(bb);
} catch (IOException e) {
// we are just doing best effort right now
}
}
// if there is nothing left to send, we are done
if (bb.remaining() == 0) {
// 更新发送数量
packetSent();
return;
}
}
//
}