RocketMQ源码分析(四) - 18965050/RocketMQ GitHub Wiki
普通PushConsumer接收流程
- 创建并启动MQClientInstance, 如果是集群模式(CLUSTING),则offsetStore为RemoteBrokerOffsetStore(存储在broker上). 启动MQClientInstance时有两个重要的服务线程会被启动, 分别是:
- pullMessageService: 从pullRequestQueue中获取PullRequest消息(此消息由RebalanceService放入队列中), 进行消息的拉取
- RebalanceService: 负载均衡每个client对应的messageQueue,默认的策略为AllocateMessageQueueAveragely,即均匀的对client进行分配messageQueue(举例来说: 比如一个topic的相同consumeGroup有2个client, 而messageQueue一共有5个, 则第一个client分配3个messageQueue, 第二个client分配2个messageQueue)
- 更新topic订购信息(DefaultMQPushConsumerImpl.updateTopicSubscribeInfoWhenSubscriptionChanged()), 其目的是获取此topic在broker集群中的所有messageQueue及其分布情况,供后续rebalance使用
- 调用DefaultMQPushConsumerImpl.sendHeartbeatToAllBrokerWithLock(),建立和所有master broker的连接, 并在broker上注册其连接通道
- 唤醒rebalanceService服务立即执行(默认情况下, RebalanceService会每隔20s进行轮询). 其根据上述2获取的此topic所有messageQueue信息以及对应此topic-consumerGroup的client数量,进行负载均衡, 来确定当前client(consumer)应该消费的messageQueue队列情况
- 调用RebalanceImpl.updateProcessQueueTableInRebalance()方法, 对每个messageQueue, 根据其中的broker信息到broker查询此topic-consumerGroup-queueId当前的消费位置, 组装PullRequest对象, 并将此对象发送给PullMessageService的pullRequestQueue队列
- PullMessageService服务线程被激活, 将请求转发给DefaultMQPushConsumerImpl.pullMessage()方法, 此方法先判断状态, 是否需要限流等, 再创建PullCallback回调钩子对象, 最后委派给PullAPIWrapper.pullKernelImpl()进行消息拉取.一次消息拉取的批次数默认为32(DefaultMQPushConsumer.pullBatchSize)
- PullAPIWrapper.pullKernelImpl()方法中, 构造PullMessageRequestHeader对象, 异步发送给对应的broker进行消息的获取
- broker端, PullMessageProcessor.processRequest()接受到PULL_MESSAGE请求,委托给DefaultMessageStore.getMessage()方法进行消息的拉取, 此方法先根据topic,queueId和consumeQueueOffset定位到对应的consumequeue文件, 获取commitLog的索引信息, 再利用索引信息中消息的物理offset找到对应的消息. 如前所述, 一次消息拉取批次默认为32条
- response返回后, PullCallback回调钩子被调用, onSuccess()钩子函数触发, 如果有消息返回(PullStatus.FOUND), 委派给ConsumeMessageConcurrentlyService.submitConsumeRequest()函数, 此函数默认的消息消费批次(DefaultMQPushConsumer.consumeMessageBatchMaxSize)为1, 即一条一条消费. 拉取的消息被封装为ConsumeRequest对象, 由DefaultMQPushConsumer注册的MessageListener进行拉取消息的消费
- 通过PullMessageService.executePullRequestImmediately()或executePullRequestLater()方法, 不断的往PullMessageService的pullRequestQueue队列中放入PullRequest请求消息, 这样可进行消息的不断拉取
简要流程图见下:
