DIS Kafka Partition Migration - tenji/ks GitHub Wiki

DIS Kafka 分区迁移

后台提供接口,并在ServiceCM提供操作入口,可指定待迁移集群指定通道的指定分区。

一、故障迁移过程

  1. 查询待迁移集群通道,选择需要迁移的通道和分区信息;(ServiceCM)
  2. 在目标集群创建相同分区规格的 Topic;(ServiceMgt)
  3. 计算迁移信息,更新 Hbase 中分区信息,并通知 Gateway 更新缓存信息;(ServiceMgt)
  4. Gateway 刷新缓存路由信息;(Gateway)
  5. 生产数据(Gateway) 直接写入目标集群。
  6. 消费数据时,按照 Offset 管理规则路由到对应的集群和 Offset(Gateway) 若待迁移集群处于故障状态 若选择忽略待迁移集群的数据,直接 resetOffset 从目标集群开始消费 若选择不忽略待迁移集群的数据,直接返回空数据 若待迁移集群处于正常状态 若选择忽略待迁移集群的数据,直接从目标集群开始消费 若选择不忽略待迁移集的数据,Offset 小于迁移 Offset 则从待迁移集群消费,大于迁移 Offset 则按照换算规则从目标集群开始消费
  7. 定时任务检查故障集群状态,并更新待迁移集群最终 Offset

迁移信息包括以下信息,作为分区表新增元数据:

待迁移(故障)集群 ID:originClusterId
目标集群 ID:targetClusterId
待迁移集群最终 Offset:endOffset
迁移状态:status
是否忽略待迁移集群数据:ignoreOrigin

Offset 消费规则:

客户端消费 Offset = 待迁移(故障)集群最终 Offset + 目标集群 Offset

问题: 1、故障集群恢复前后 Offset 跳变的问题,比如恢复前 Offset = 1 消费的是目标集群,恢复后 Offset = 1 消费的是故障集群;

二、平滑迁移过程

  1. 查询待迁移集群通道,选择需要迁移的通道和分区信息;(ServiceCM)
  2. 在目标集群创建相同分区规格的 Topic;(ServiceMgt)
  3. 计算迁移信息,将迁移信息写入 ZooKeeper;(ServiceMgt)
  4. Gateway 在 ZooKeeper 中注册,并读取迁移信息,失败则进程退出;(Gateway)
  5. 所有 Gateway 注册完毕,触发启动迁移事件;(ServiceMgt)
  6. 上传数据(Gateway) 若分区处于迁移中状态,先写入待迁移集群: 若写入数据的 Offset 均小于迁移 Offset,直接返回; 若 Offset 部分小于迁移 Offset,大于迁移 Offset 部分写入目标集群,更新状态; 若 Offset 全部大于 Offset,全部写入目标集群,更新状态; 若分区处于其它状态,直接写入目标集群即可。
  7. 下载数据(Gateway) 若分区处于已老化(Aged)状态: 直接按照 Offset 规则换算成目标集群 Offset 消费; 若分区处于其它状态: 若 Offset 大于迁移 Offset,换算成目标集群 Offset 直接从目标集群消费; 若 Offset 小于迁移 Offset,则从待迁移集群消费,大于迁移 Offset 的部分移除;
  8. 定时任务检查迁移状态,若待迁移集群分区已经被老化,则将迁移状态更新为老化完成并删除 Topic

迁移信息包括以下信息:

待迁移集群 ID:originClusterId
目标集群 ID:targetClusterId
待迁移集群最终 Offset:endOffset
迁移状态:status
    1)迁移中      Migrating
    2)迁移完成    Migrated
    3)老化完成    Aged

Offset 消费规则:

客户端消费 Offset = 待迁移集群最终 Offset + 目标集群 Offset

迁移的 Offset 怎么确定? 根据当前 Offset 和当前上传速率一起计算?比如当前 Offset 为 100000,上传速率为:2000条/秒,那么假设 Gateway 在 5 分钟内可以做好迁移准备,那么迁移 Offset 可以确定为: 100000 + 2000 * 60 * 5 = 700000

问题: 1、迁移的 Offset 怎么确定?根据当前 Offset 和当前上传速率一起计算? 2、当前 Producer 的 ack = 1,若迁移过程分区 Leader 切换,可能导致数据混乱?

三、屏蔽集群

集群不可用时,可直接屏蔽集群,防止由于集群不可用而导致 Gateway 被 Nginx 踢除掉,进而影响其他的集群。