RocketMQ源码分析(六) - 18965050/RocketMQ GitHub Wiki

HA

RocketMQ的集群部署可如下图所示:

cluster-deploy

说明如下:

  • 一个集群包括一个namesrv集群和一个broker集群
  • namesrv集群中的节点是无状态的. 一般来说, 会选取第一个节点作为master namesrv, 当master不可用时, 轮询取后续的节点作为namesrv
  • broker集群可包含多个master节点. 每个master节点的brokerId为0, 但brokerName不能相同. 一般我们会配置brokerClusterName相同, 表明是同一个broker集群(当然, brokerClusterName也可以不同, rocketMQ对broker集群名称不做校验, 但一般我们不这么做).
  • 一个master节点可以有0或多个slave节点. slave节点的brokerId不为0, 但brokerName和master节点相同
  • master节点可做消息发布和消费使用. 但slave节点只能做消费使用
  • 如果master节点及其对应的所有slave节点均不可用, 则此broker上的consumeQueue通道也不可用. 对应此通道的消息则不能消费

流程分析如下:

  • BrokerController初始化时, 判断broker角色为BrokerRole.SLAVE时, 再判断是否配置HaMasterAddress. 如果配置的话, 则直接设置HaService.HaClient master broker地址, 否则设置updateMasterHAServerAddrPeriodically为true, 此会在BrokerController.start()方法中进行broker注册时,通过响应获取maseter broker地址并设置HaService.HaClient master broker地址.
  • 一旦HaService.HaClient masterAddress地址设置, HaService.HaClient 服务线程被唤醒, 其会:
    • 建立和master broker连接(HaService.HaClient.connectMaster())
    • 向master broker汇报自身commitLog当前的max offset(HaService.HaClient.reportSlaveMaxOffset())
    • master broker中HAConnection.ReadSocketService服务线程被激活, 获取到slave broker的max offset,并写入到HAHaConnection.slaveRequestOffset中
    • 一旦HAHaConnection.slaveRequestOffset!=-1,HAConnection.WriteSocketService服务线程被唤醒, 计算从何处进行commitLog同步(如果slaveRequestOffset==0,表明从master broker对应commitLog的maxOffset对应的mapedFile开头开始同步, 否则从master broker中对应slave broker的maxOffset开始同步).
    • 同步数据格式为: sync-data-format 一次最多同步MessageStoreConfig.haTransferBatchSize大小的数据(默认为32K)
    • slave broker接收master broker响应数据,进行commitLog数据追加(HaService.HaClient.processReadEvent()),以及consumeQueue构建
    • 更新currentReportedOffset,并再次向master broker汇报自身commitLog当前的max offset
  • 定时任务触发SlaveSynchronize.syncAll(), 同步topic, consumeOffset,delayOffset,subscriptionGroup等信息

流程图如下: ha-workflow