Zeppelin Sync 设计 - Qihoo360/zeppelin GitHub Wiki
背景介绍
- Zeppelin Node节点之间的数据同步采取异步复制的方式
- 数据同步方式包括部分同步和全同步
- 部分同步依赖节点间的Binlog同步
- Binlog文件写至固定大小会切换生成新文件,同步点为Binlog文件名+文件内偏移量
保证
- 任何情况下主从Binlog的偏移量一致,即通过给定的file+offset便宜量可以确定的找到对应的命令。
- 宕机情况下尽量保证少丢Binlog数据。
- 为了保证偏移量一直,Binlog中可能填充空白数据。
Binlog格式
如上图所示,一个Binlog文件划分为多个等长的Block,每个Block又包含多个Record,用户的写入请求序列化后写入Record中。Block要求其开头是一条完整的Record的开始,这种安排是为了降低数据损坏的风险,使其最多只丢失一个Block长度的数据。也正是由于这种安排,一条较长的数据可能需要由多个Block共同存储,我们在Record中用Type表示当前的数据是完整数据还是被拆分在不同Block中的其中一条。
Binlog同步
上图可以看出Zeppelin在Binlog的收发两端分别有BinlogSender和BinlogReceiver来进行处理,这两个角色都保证固定的线程数来避免随着Parititon增加,Zeppelin整体线程数的膨胀。
BinlogSender
每当有一个新的主从关系建立,主节点都会封装一个BinlogSendTask任务,并将其加入任务队列,这个Task记录了同步所需要的所有信息:
- 所属的Table,Partiton;
- 要发送到的目的从节点ip,port;
- 当前发送到的Binlog文件号及偏移量。 BInlogSend线程会在空闲的时候从队列中先进先出的取得一个Task进行处理,解析对应的Binlog Record并发送给对方节点,之后修改当前Task的偏移量信息。每个Task有一个固定的时间片,时间片用完后会被重新加入到任务队列,BinlogSend线程则去获得下一个任务进行处理。当前节点不再当Master时需要从任务队列中将对应的Task删除。
BinlogReceiver
由一个BinlogReceiver接受所有的Binlog的请求,进行过滤并按照Partition分发给不同的BgWorker执行。
交互
为了性能考虑BinlogSender到BinlogReceiver之间的数据是单向的,这就导致了Master并不知道Slave的同步是否正常。我们复用了TrySync逻辑,当Slave发生不可恢复的同步错误时,会修改自己的同步状态,并用最新的Binlog Offset重新向Master发起Trysync请求。Master会用新的BinlogSendTask代替之前的Task。之后从新的同步点从新发送Binlog,如下图所示:
同步格式
在BinlogSender向BinlogReceiver发送的内容中,除了Binlog中本身记录的内容外,还需要加入一些额外信息来给接收端进行验证,以保证同步的正确性。同步消息的定义如下:
message SyncRequest {
required SyncType sync_type = 1;
required int64 epoch = 2;
required Node from = 3;
required SyncOffset sync_offset = 4;
optional CmdRequest request = 5;
optional BinlogSkip binlog_skip = 6;
}
enum SyncType {
CMD = 0;
SKIP = 1;
}
message SyncOffset {
required int32 filenum = 1;
required int64 offset = 2;
optional int32 partition = 3;
}
message BinlogSkip {
required string table_name = 1;
required int32 partition_id = 2;
required int64 gap = 3;
}
目前有两种类型的同步消息:
- CMD类型表示正常的命令同步,包含CmdRequest字段存储用户写入请求
- SKIP类型的同步命令携带BinlogSkip信息,通知接收端将Binlog位置向后推移gap长度后再接受后续命令,这种类型是为了处理发送端发生解析错误导致的,主从binlog位置不一致
同步机制完善
1,Binlog检查机制:Binlog发送端携带更多的信息,来给接收端做正确性验证
BinlogSender发送Binlog前会添加如下信息:
- 发送方ip,port
- 当前发送方epoch
- 当前发送的Binlog的Offset
BinlogReceiver接受到同步信息后对上述信息进行检查,不合法的信息会被丢弃。
2,Offset回退机制
Master可以根据Slave发来的Offset决定是否出发Slave将自己的Offset重置为一个更旧的点,这个更旧的点会保证是一个Block的开头。Master和Slave都需要有能力重置Binlog的读写点。这种情况目前发生在两种场景:
- Slave的Offset大于Master:Master将Offset回退到自己成为主之前最近的Block开头;
- Slave的Offset在Master处不合法:Master将Offset回退到Slave Offset之前最近的Block开头;
这个Offset的回退信息会通过TrySync的答复返回个Slave
3,Master触发Slave填充空白Binlog机制
为了保证主从Binlog的同步点的一致,需要一种机制能支持Master主动要求Slave填充空白Binlog的机制。 增加了特殊类型的Binlog同步命令SKIP,Master发生Binlog读取异常时,会向Slave发送改命令和期望Binlog向后推移的长度Gap,Slave收到后在自己的Binlog中当前位置填充Gap长度的空白内容。同时当前Master也需要处理这种空白内容来应对之后的主从切换,增加Empty的Binlog类型
4,Slave主动恢复同步错误机制
为了性能考虑BinlogSender到BinlogReceiver之间的数据是单向的,这就导致了Master并不知道Slave的同步是否正常。我们复用了TrySync逻辑,当Slave发生不可恢复的同步错误时,会修改自己的同步状态,并用最新的Binlog Offset重新向Master发起Trysync请求。Master会用新的BinlogSendTask代替之前的Task。之后从新的同步点从新发送Binlog
5,Master动态更新Slave Lease的机制
Master向Slave的Binlog发送可能由于Master的重启等原因中断,这种情况发生时,希望Slave能主动再次发起Trysync。因此我们要求Master的BinlogSendTask每次获得时间片都要保证给Slave发送消息,如果没有新增的BInlog则发送RenewLease消息。这种消息会携带租约长度。Slave超过当前租约没有收到新的消息便发起Trysync。动态的更新租约时间是由于我们的Binlog Sender线程是与Partition数解耦的,这就导致当Master的BInlog发送任务较多的时候,针对每个Slave的时间间隔会增加。
6,全同步中断的恢复机制
Master收到全同步请求时,会dump db并向Slave发送整个库,这个过程中如果有中断Slave并不会感知,从而停留在WaitOffset状态。我们增加了Slave的重试机制来解决。并由Master来处理同一个Slave同一个Partition可能有多个重复的TrySync的问题。