RocketMQ源码分析(五) - 18965050/RocketMQ GitHub Wiki

相关问答

  • 消息消费时是如何根据Tag来进行过滤的

    消息落盘commitLog后, 会在consumeQueue文件中写入索引. 每个consumeQueue记录为20个字节. 格式为offset(8)+size(4)+tagsCode(8).消息者进行消息拉取时, 会带入订购信息subscriptionData. 由subscriptionData中的codeSet是否包含tagsCode来进行消息的过滤

  • 消息消费广播模式(BROADCASTING)是如何做到的

    广播模式指的是每个consumer client不受同一consumerGroup中的其他cosumer影响, 能独立的对某一topic中的消息进行消费. 其实现的关键是使用LocalFileOffsetStore(集群模式使用的是RemoteBrokerOffsetStore). LocalFileOffsetStore针对每个consumer在${user.home}/.rocketmq_offsets/${clientId}/${consumerGroup}路径下生成offsets.json文件, 指明当前client的消费情况. 而RemoteBrokerOffsetStore是在broker上, 因此一个consumer client对consumeOffset的修改会影响到其他相同consumerGroup中consumer的消费

  • 如何做到消息顺序消费

    rocketmq中, 消息存储在commitLog中不是顺序的, 但在topic的queue中(consumeQueue)中的记录是顺序记录的.消息的顺序消费包括两部分的内容: 一是消息顺序的发送时, 在consumeQueue中是顺序排序的.二是消息消费时, 应该针对topic的queueId来进行消费.

    消息发送顺序需要实现MessageQueueSelector接口, 将顺序消息发送给某一个指定的topic-queueId. 如果不实现此接口, 默认使用MQFaultStrategy来对消息选择发送通道, 此策略类似于轮询的方式:对第一条消息随机选择一个发送通过, 后面的消息发送则循环进行轮询

    消息消费时由于一个消费线程可能对应多个messageQueue,虽然每个messageQueue中取出的消息是顺序的, 但不同messageQueue中顺序则不能保证. 为了能尽量做到一个messageQueue中消息能消费完再消费另一个messageQueue, 建议consumer端的MessageListener实现MessageListenerOrderly接口, 这样会保证消息消费的服务为ConsumeMessageOrderlyService(而不是ConsumeMessageConcurrentlyService),此服务线程进行消费时会锁定messageQueue(注意这两个类中的ConsumeRequest.run()方法实现的差异), 尽可能保证一个messageQueue中的消息消费完再消费另一个messageQueue. 当然, 最好的方式还是保持让consumer消费线程和messageQueue一致.

  • filterClass如何实现消息消费时的过滤

    和tag进行消息消费过滤不同(tag是用来过滤发送的一类消息),filterClass可以实现消息更细粒度的过滤. 其实现流程如下:

    1. 启动FiltersrvStartup,其会往对应的broker注册自身的连接通道(信息被注册到broker的FilterServerManager.filterServerTable中),或也可以在broker配置文件中配置filterServerNums配置项, broker会对此配置项进行判断并以shell的方式启动filtersrv
    2. consumer消息消费时, 会注册com.alibaba.rocketmq.common.filter.MessageFilter接口实现类(注意, 不是com.alibaba.rocketmq.store.MessageFilter接口, 此接口用于对tag进行过滤),在consumer启动时会通过请求code(RequestCode.REGISTER_MESSAGE_FILTER_CLASS)将此源码上传给broker对应的filtersrv, 在filtersrv上进行源码的编译和加载
    3. consumer消息拉取时, 根据从broker中获取的TopicRouteData对象可以随机找到一条对应的filtersrv地址信息. 往此filtersrv发送消息拉取请求(RequestCode.PULL_MESSAGE)
    4. filtersrv中DefaultRequestProcessor.pullMessageForward()方法被触发, 其起到代理的作用, 一方面从broker中拉取消息信息, 另一方面通过自身的PullCallback构造对象对broker返回的消息进行filterClass的过滤, 并将过滤结果集返回给consumer

    简要流程如下: filterClass

  • Pull Consumer是如何实现的

    Pull Consumer指的是consumer主动从broker上去拉取消息, 而不是被动的接收broker上推送过来的消息. 让我们先回顾下Push Consumer的实现原理. 根据上篇说明, 可以明白Push Consumer的关键在于:

    1. RebalanceService启动后会组装PullReqeust,并发送给PullMessageService的pullRequestQueue队列
    2. PullMessageService线程服务因此被唤醒, 根据PullRequest中的MessageQueue信息, 到相应的broker中去拉取对应topic-queueId的消息.
    3. 在PullCallback钩子实现类中再次往PullMessageService的pullRequestQueue队列放入PullRequest对象, 实现messageQueue消息的不断拉取

    和Push Consumer使用的消费类(DefaultMQPushConsumer)不同, Pull Consumer使用的消费类为DefaultMQPullConsumer.其关键点在于和DefaultMQPullConsumer配套的Rebalance实现类为RebalancePullImpl, 和DefaultMQPullConsumer配套的Rebalance实现类RebalancePushImpl不同, 其不会往PullMessageService的pullRequestQueue队列发送消息, 这样PullMessageService服务线程永远不会被唤醒, 必须通过手动拉取的方式到指定的MessageQueue中去获取消息.

    另一个需要注意的地方在于Push Consumer会使用某种分配策略(默认为AllocateMessageQueueAveragely),将messageQueue分配给某个consumer client, 而在Pull Consumer上, 此策略则完全无效, 需要从master broker集群中获取所有关于此topic的所有messageQueue, 并自己指定消息哪个messageQueue.

  • 如何发送定时消息

    RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s,10s,1m 等. 要发送定时消息, 配置很简单, 只需要对发送消息设置msg.setDelayTimeLevel(3).延迟级别对应的是MessageStoreConfig的messageDelayLevel. 其默认值为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h, 因此上述的级别3对应的是30s.

    定时消息发送原理很简单,原理如下:

    1. broker收到producer的PUT_MESSAGE请求, 委托给CommitLog.putMessage()方法处理,此方法中当发现消息中配置了延迟级别,会将topic修改为ScheduleMessageService.SCHEDULE_TOPIC,queueId修改为delayLevel-1,并将真实的topic和queueId存储在消息的属性中(属性key分别为MessageConst.PROPERTY_REAL_TOPIC和MessageConst.PROPERTY_REAL_QUEUE_ID), 然后进行正常的消息bytebuf写入, 落盘和consumeQueue写入.这一过程完成, 由于consumeQueue中的topic和queueId为ScheduleMessageService.SCHEDULE_TOPICdelayLevel-1, 因此并不能被消费
    2. 服务ScheduleMessageService有个定时任务DeliverDelayedMessageTimerTask, 其会定时从ScheduleMessageService.SCHEDULE_TOPIC/delayLevel-1中取出consumeQueue记录, 判断定时时间是否到,定时到则根据此consumeQueue记录从commitLog中取出真正的消息,并还原此消息的topic和queueId为真实值.再次写入commitLog的bytebuf, 落盘和真实consumeQueue的写入. 这样, 消息就可以被消费了.

    由上述流程可以看出, 消息定时发送会写两次commitLog和consumeQueue, 这个是需要注意的地方

  • 如何定时拉取消息

    定时消息拉取的例子如下:

     final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(
     			"schedulerPullConsumer");
     scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("localhost:9876");
    
     scheduleService.registerPullTaskCallback("simpleTest", new PullTaskCallback() {
    
     	public void doPullTask(MessageQueue mq, PullTaskContext context) {
     		//(1) 从mq中获取consumeOffset
     		//(2) 从mq中拉取一定数量消息
     		//(3) 消息消费
     		//(4) 消费完成后, 更新consumeOffset
     		//(5) 设置下次拉取延迟时间间隔(context.setPullNextDelayTimeMillis(xxx))
     	}
    
     };
    
     scheduleService.start();
    

    其原理如下:

    1. MQPullConsumerScheduleService也是一种拉消息模型.启动后RebalanceService服务线程会被唤醒. 其根据消息模型(CLUSTERING还是BROADCASTING)进行messageQueue的分配,并在更新RebalanceImpl.processQueueTable时返回true(RebalanceImpl.updateProcessQueueTableInRebalance()方法返回)
    2. 这会触发MQPullConsumerScheduleService.MessageQueueListenerImpl.messageQueueChanged()方法调用, 此方法会组装PullTaskImpl任务, 并通过定时线程池scheduledThreadPoolExecutor立即触发线程任务PullTaskImpl执行
    3. PullTaskImpl进行上面的PullTaskCallback.doPullTask()钩子方法回调, 实现消息的拉取和消费,并设定下次拉取的延迟间隔
    4. 定时线程池scheduledThreadPoolExecutor获取延迟间隔后, 再次定时执行PullTaskImpl任务
    5. 上述3,4步骤不断循环, 即实现了消息的定时拉取
  • 如何发送事务消息

    RocketMQ中消息的发送事务由两阶段提交来实现.

    producer-transaction

    其简单原理为: 先进行消息的存储, 落盘, 但不更新consumeQueue, 再根据此消息对应的本地事务状态(提交或回滚),再次生成一条消息,并依据事务状态确定是否更新comsumeQueue. 使用TransactionProducer,基本代码逻辑如下:

     TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
     TransactionMQProducer producer = new TransactionMQProducer("transProducer");
    
     producer.setTransactionCheckListener(transactionCheckListener);	//防止异常情况下,事务状态不能确定时, 可由broker主动检查消息的事务状态. 但从代码查看, 似乎broker端事务检查未实现完全
    
     Message msg = new Message("transTest","*", "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    
     SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
    

    实现流程如下:

    1. DefaultMQProducerImpl.sendMessageInTransaction()方法中, 设置消息的两个属性(MessageConst.PROPERTY_TRANSACTION_PREPARED, MessageConst.PROPERTY_PRODUCER_GROUP), 并发送至broker
    2. broker对消息进行正常的存储, 落盘操作, 但DefaultMessageStore.ReputMessageService服务线程却不会操作consumeQueue(DefaultMessageStore.doDispatch())
    3. 消息同步发送返回后, 执行LocalTransactionExecuter.executeLocalTransactionBranch()方法, 获得消息本地事务状态(LocalTransactionState,UNKNOW,LocalTransactionState.COMMIT_MESSAGE或LocalTransactionState.ROLLBACK_MESSAGE),进行endTransaction操作.
    4. DefaultMQProducerImpl.endTransaction()方法中, 组装EndTransactionRequestHeader请求头, 并以消息码(RequestCode.END_TRANSACTION)发送给broker
    5. broker中, 请求委托给EndTransactionProcessor.processRequest()方法处理, 此方法会先根据commitLog的offset找到此条记录, 封装成MessageExtBrokerInner对象, 再次进行消息的存储, 落盘. 并且, 如果事务状态为LocalTransactionState.COMMIT_MESSAGE, 会通过DefaultMessageStore.ReputMessageService写入consumeQueue,这样消息就可被consumer消费了.