Zookeeper 数据与存储 - litter-fish/ReadSource GitHub Wiki
通过配置文件中dataLogDir属性指定,事务文件的特点:
- 大小一致,文件大小都是64M
- 文件名后缀都是一个十六进制的数字
日志格式
可通过命令:java LogFormatter log.30000001
日志的写入:
事务日志的写入时序图:
// org/apache/zookeeper/server/persistence/FileTxnLog.java
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
// 非事务返回false
if (hdr == null) {
return false;
}
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
// 是否有日志文件与Zookeeper服务器关联
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
// 创建一个事务日志文件,文件名为:log.{zxid}
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
// 写入文件头:魔数、版本号即dbId
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
// 刷入文件头数据到磁盘中
logStream.flush();
currentSize = fos.getChannel().position();
// 文件流放入 streamsToFlush 集合中,
streamsToFlush.add(fos);
}
// 确定事务文件是否需要扩容
currentSize = padFile(fos.getChannel());
// 事务序列化,最终生成一个字节数组
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
// 生成 Checksum
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
// 将序列化后的事务头、事务体及Checksum写入文件流
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);
return true;
}
确定事务文件是否需要扩容
// org/apache/zookeeper/server/persistence/FileTxnLog.java
private long padFile(FileChannel fileChannel) throws IOException {
// 计算新文件的大小,preAllocSize = 64M
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {
// 使用“0”填充其他内容
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
计算新文件的大小
// org/apache/zookeeper/server/persistence/FileTxnLog.java
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
// 检测当前文件的可写空间是否不足4096字节
if (preAllocSize > 0 && position + 4096 >= fileSize) {
if (position > fileSize){
fileSize = position + preAllocSize;
fileSize -= fileSize % preAllocSize;
} else {
fileSize += preAllocSize;
}
}
return fileSize;
}
事务序列化
// org/apache/zookeeper/server/persistence/Util.java
public static byte[] marshallTxnEntry(TxnHeader hdr, Record txn)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputArchive boa = BinaryOutputArchive.getArchive(baos);
hdr.serialize(boa, "hdr");
if (txn != null) {
txn.serialize(boa, "txn");
}
return baos.toByteArray();
}
// 将序列化后的事务头、事务体写入文件流
// org/apache/zookeeper/server/persistence/Util.java
public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
throws IOException {
oa.writeBuffer(bytes, "txnEntry");
oa.writeByte((byte) 0x42, "EOR"); // 'B'
}
通过配置文件中dataDir属性指定,快照文件的特点:
- 文件名后缀使用zxid的十六进制表示
日志格式
可通过命令:java SnapshotFormatter log.30000001
数据快照
执行数据快照的时序图:
// org/apache/zookeeper/server/ZooKeeperServer.java
public void takeSnapshot(){
try {
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
} catch (IOException e) {
LOG.error("Severe unrecoverable error, exiting", e);
System.exit(10);
}
}
事务工厂保存快照
// org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
public void save(DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
// 创建快照文件
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
snapshotFile);
// 向快照文件中写入内容
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
}
快照文件生成
// org/apache/zookeeper/server/persistence/Util.java
public static String makeSnapshotName(long zxid) {
return FileSnap.SNAPSHOT_FILE_PREFIX + "." + Long.toHexString(zxid);
}
向快照文件中写入内容
// org/apache/zookeeper/server/persistence/FileSnap.java
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
throws IOException {
if (!close) {
OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());
//CheckedOutputStream cout = new CheckedOutputStream()
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
// 创建FileHeader对象,包含魔数、版本号
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
// 写入内容和Session的过期时间
serialize(dt,sessions, oa, header);
long val = crcOut.getChecksum().getValue();
// 写入Checksum
oa.writeLong(val, "val");
oa.writeString("/", "path");
sessOS.flush();
crcOut.close();
sessOS.close();
}
}
序列化写入内容
// org/apache/zookeeper/server/persistence/FileSnap.java
protected void serialize(DataTree dt,Map<Long, Integer> sessions,
OutputArchive oa, FileHeader header) throws IOException {
if(header==null)
throw new IllegalStateException(
"Snapshot's not open for writing: uninitialized header");
// 写入文件头信息
header.serialize(oa, "fileheader");
// 写入文件内容及Session
SerializeUtils.serializeSnapshot(dt,oa,sessions);
}
序列化内容及Session
// org/apache/zookeeper/server/util/SerializeUtils.java
public static void serializeSnapshot(DataTree dt,OutputArchive oa,
Map<Long, Integer> sessions) throws IOException {
HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
// 写入Session数量
oa.writeInt(sessSnap.size(), "count");
// 写入Session超时时间
for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
oa.writeLong(entry.getKey().longValue(), "id");
oa.writeInt(entry.getValue().intValue(), "timeout");
}
// 写入内容
dt.serialize(oa, "tree");
}
序列化节点内容
// org/apache/zookeeper/server/DataTree.java
public void serialize(OutputArchive oa, String tag) throws IOException {
scount = 0;
// 写入ACL
aclCache.serialize(oa);
// 写入节点内容
serializeNode(oa, new StringBuilder(""));
if (root != null) {
// 最后写入根路径
oa.writeString("/", "path");
}
}
从根节点开始递归节点,写入节点内容
// org/apache/zookeeper/server/DataTree.java
void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
String pathString = path.toString();
// 获取节点
DataNode node = getNode(pathString);
if (node == null) {
return;
}
String children[] = null;
DataNode nodeCopy;
synchronized (node) {
scount++;
StatPersisted statCopy = new StatPersisted();
copyStatPersisted(node.stat, statCopy);
// 创建节点的副本,确保为被改变
nodeCopy = new DataNode(node.parent, node.data, node.acl, statCopy);
Set<String> childs = node.getChildren();
children = childs.toArray(new String[childs.size()]);
}
oa.writeString(pathString, "path");
oa.writeRecord(nodeCopy, "node");
path.append('/');
int off = path.length();
for (String child : children) {
path.delete(off, Integer.MAX_VALUE);
path.append(child);
serializeNode(oa, path);
}
}
使用快照初始化
使用快照初始化时序图:
加载数据
// org/apache/zookeeper/server/ZKDatabase.java
public long loadDataBase() throws IOException {
// 处理快照文件
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
处理快照文件
// org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
// 反序列化快照文件
snapLog.deserialize(dt, sessions);
return fastForwardFromEdits(dt, sessions, listener);
}
反序列化快照文件
// org/apache/zookeeper/server/persistence/FileSnap.java
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// 根据ZXID获取最新的100个快照文件
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
// 依次解析100个快照文件
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
// 反序列化操作,还原DataTree和SessionTimeOut
deserialize(dt,sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
return dt.lastProcessedZxid;
}
反序列化操作,还原DataTree和SessionTimeOut
// org/apache/zookeeper/server/persistence/FileSnap.java
public void deserialize(DataTree dt, Map<Long, Integer> sessions,
InputArchive ia) throws IOException {
FileHeader header = new FileHeader();
// 反序化文件头,判断魔数
header.deserialize(ia, "fileheader");
if (header.getMagic() != SNAP_MAGIC) {
throw new IOException("mismatching magic headers "
+ header.getMagic() +
" != " + FileSnap.SNAP_MAGIC);
}
// 反序列化session、DataTree对象
SerializeUtils.deserializeSnapshot(dt,ia, sessions);
}
反序列化session、DataTree对象
// org/apache/zookeeper/server/util/SerializeUtils.java
public static void deserializeSnapshot(DataTree dt,InputArchive ia,
Map<Long, Integer> sessions) throws IOException {
int count = ia.readInt("count");
while (count > 0) {
long id = ia.readLong("id");
int to = ia.readInt("timeout");
sessions.put(id, to);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"loadData --- session in archive: " + id
+ " with timeout: " + to);
}
count--;
}
// 还原DataTree
dt.deserialize(ia, "tree");
}
还原DataTree
// org/apache/zookeeper/server/DataTree.java
public void deserialize(InputArchive ia, String tag) throws IOException {
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
String path = ia.readString("path");
while (!path.equals("/")) {
DataNode node = new DataNode();
// 读取文件内容,DataNode
ia.readRecord(node, "node");
// 存储到内存中
nodes.put(path, node);
synchronized (node) {
// 加入acl
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
String parentPath = path.substring(0, lastSlash);
// 设置父节点
node.parent = nodes.get(parentPath);
if (node.parent == null) {
throw new IOException("Invalid Datatree, unable to find " +
"parent " + parentPath + " of path " + path);
}
// 设置节点为父节点的孩子节点
node.parent.addChild(path.substring(lastSlash + 1));
long eowner = node.stat.getEphemeralOwner();
if (eowner != 0) {
// 处理临时节点,并保存临时节点的集合中
HashSet<String> list = ephemerals.get(eowner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(eowner, list);
}
list.add(path);
}
}
path = ia.readString("path");
}
nodes.put("/", root);
// we are done with deserializing the
// the datatree
// update the quotas - create path trie
// and also update the stat nodes
setupQuota();
aclCache.purgeUnused();
}