RocketMQ源码分析(三) - 18965050/RocketMQ GitHub Wiki

普通消息发送流程

这里说下大概的流程及注意点, 太细的话会涉及到很多的类

  1. 创建并启动MQClientInstance. MQClientInstance对应着一个client实例(producer, consumer都是client)

  2. 消息发送. 默认采取同步的通讯方式, 即producer发送并等待返回.流程如下:

    1. producer根据topic从namesrv获取所有可用发送消息的messageQueue(DefaultMQProducerImpl.tryToFindTopicPublishInfo()).这个流程对于broker上不存在的topic会走两遍: 第一次通过新建的topic到broker上找不到topic发布路由信息(TopicRouteData), 这样第二次会通过默认的topic(TBW102)到broker上去查询发布路由信息,取回默认的路由信息后再进行相应的修正(defaultTopicQueueNums).
    2. 在取回的路由发布信息集合中选择一条MessageQueue进行消息的发布(DefaultMQProducerImpl.selectOneMessageQueue).默认是选择随机数%messageQueue队列大小取模得到, 后续再发送则进行轮询的方式
    3. 调用sendKernelImpl()方法进行消息组装并发送,默认采用精简字节进行发送(即将SendMessageRequestHeader转换为SendMessageRequestHeaderV2, 这样占用空间更小), 请求代码为RequestCode.SEND_MESSAGE_V2
  3. 选择的MessageQueue对应的broker对消息进行解码, 并转换为SendMessageRequestHeader对象, 进入SendMessageProcessor.sendMessage()方法

  4. 此方法先对请求信息进行必要校验(AbstractSendMessageProcessor.msgCheck()), 在校验过程中, 判断此topic对应的TopicConfig是否存在, 不存在且broker允许(BrokerConfig.autoCreateTopicEnable)则依据默认的topic进行创建TopicConfig, 并修改相应的读写通道, 创建完成会持久化到文件中(${user.home}/store/config/topics.json)

  5. 封装为MessageExtBrokerInner对象, 并调用DefaultMessageStore.putMessage()方法进行消息的存储

  6. DefaultMessageStore委派CommitLog.putMessage()方法进行消息的追加.CommitLog委托最后一个MapedFile的appendMessage()方法进行消息的追加,在此方法中, 最终调用DefaultAppendMessageCallback.doAppend()将消息写入ByteBuf(注意, 如果此MapedFile放不下消息,会申请一个新的MapedFile来放置)

  7. 根据配置是同步刷盘还是异步刷盘, 分别调用GroupCommitService或FlushRealTimeService来将消息刷入到磁盘文件中

  8. DefaultMessageStore中的ReputMessageService是个定时服务(每1ms轮询一次), 其根据刷盘到commitLog中的消息来创建consumerQueue(consumerQueue用于消息的访问), 并依据消息中是否含有key来构建索引文件index

简要流程图见下: normal-producer