RocketMQ源码分析(二) - 18965050/RocketMQ GitHub Wiki

Broker

RocketMQ broker的内部实现和Kafka有很大的不同,其数据存储和检索基本区别在于:

  • Kafka中, 每个topic的每个partition都有独立的log文件和index文件 kafka数据存储结构

    log文件中消息是以严格顺序的方式追加的,但由于每条消息的大小不同, 因此通过index文件来方便定位消息在log中的具体位置和大小.具体可参见 Kafka的Log存储解析

    而RockerMQ中, 一个broker中所有topic的所有queue数据放在一起, 通过topic的consumeQueue来进行定位. RocketMQ中, commitLog类似于Kafka的log文件,由多个文件组成commitLog队列,队列中每个文件名即为此文件开始的offset, 每个commitLog默认为1G大小(MessageStoreConfig.mapedFileSizeCommitLog) consumeQueue类似于Kafka的index文件,按topic-queue相互隔离,也由多个文件组成consumeQueue队列,每个consumeQueue默认为(30w*20)个字节大小(MessageStoreConfig.mapedFileSizeConsumeQueue).

    log中每条消息的结构为:

    log-structure

    consumeQueue用于确定消息在log中的具体位置, 其每条记录大小是固定的(20个字节), 格式为:

    consumequeue-structure

    另外, 为了方便通过消息的某个key(比如:订单ID)来快速的检索出某条消息, RocketMQ消息存储模块还包括了index文件,存放在${user.home}/store/index目录下. 和commitLog和consumequeue文件一样,index也是由多个文件组成,每个文件的结构为:

    index-structure

    由此可见, 一个index文件大小为:40+4*5000000+20*(5000000*4).

    其中, Index Header结构为:

    indexheader-structure

    slot中存放的是是key hash的index值, 大小为4个字节, 一共500w项

    每个index结构为:

    slotindex-structure

主要类说明:

  • BrokerStartup: broker启动类
  • BrokerConfig,MessageStoreConfig: broker和messageStore配置类
  • BrokerController:broker生命周期管理类
  • ConsumerOffsetManager: consumer消息情况管理类, 对应${user.home}/store/config/consumerOffset.json文件
  • SubscriptionGroupManager: consumer订购组管理器,对应${user.home}/store/config/subscriptionGroup.json文件
  • TopicConfigManager: topic管理器. 对应${user.home}/store/config/topics.json文件
  • ConsumerManager,ProducerManager: consumer,producer管理类.
  • ClientHousekeepingService: 对client(producer, consumer, filter server)连接的定时检测
  • PullMessageProcessor: consumer消息拉取处理器,处理消息的消费
  • Broker2Client, BrokerOuterAPI: 分别为broker连接client和broker连接namesrv处理器
  • FilterServerManager: broker filter server管理器,里面的filterServerTable用于存储filtersrv连接通道和filtersrv地址的关联关系
  • BrokerStatsManager: broker数据统计器. 统计器按秒, 分钟, 小时级别来采集不同的数据
  • BrokerFastFailure: 清除过期请求
  • DefaultMessageStore: 消息存储器. 管理着commitLog, consumeQueue, index等如何进行数据的存储. 包括一些重要的处理器:
    • AllocateMapedFileService: mapFile分配器
    • flushConsumeQueueService, cleanCommitLogService, cleanConsumeQueueService: consumeQueue刷盘服务, comitLog清理服务, consumeQueue清理服务
    • indexService: 消息中key的索引服务
    • HAService: 未深究, 后面补充
    • ReputMessageService: 重要类, 消息放入commitLog后, 需要在consumeQueue中构建索引, 即在此类中实现
    • ScheduleMessageService: 定时消息服务, 用于producer定时消息发送,会将定时消息转换为真实消息进行存储, 落盘和consumeQueue记录构建