RocketMQ源码分析(一) - 18965050/RocketMQ GitHub Wiki

通信解析

RocketMQ中的通信使用的是Netty网络框架,版本为4.0.36.Final.

server端涉及到的几个关键类为:

  • NettyRemotingServer: Netty Server主类. 负责Netty Server建立,监听. 同步, 异步, 单向调用, 业务处理器注册等
  • NettyEncoder,NettyDecoder: 编解码器. 对RemotingCommand进行编解码转换
  • NettyRemotingServer.NettyConnetManageHandler: 配合IdleStateHandler进行心跳及链路通断检测.并将检测结果包装成NettyEvent事件, 发送给NettyRemotingServer.ChannelEventListener实现类进行处理
  • NettyRemotingServer.NettyServerHandler: 对RemotingCommand进行业务处理. 其会委托给注册在processorTable中RemotingCommand中code对应的NettyRequestProcessor在对应的线程池(ExecutorService)中进行处理

client端涉及到的几个关键类为:

  • NettyRemotingClient: Netty Client主类.负责Netty Client建立, 连接通道的建立和缓存, 同步, 异步, 单向调用,业务处理器注册等
  • NettyEncoder,NettyDecoder: 同上
  • NettyRemotingClient.NettyConnetManageHandler: 同上. 只不过将NettyEvent事件发送给NettyRemotingClient.ChannelEventListener处理
  • NettyRemotingClient.NettyClientHandler: 同上.

消息格式

RocketMQ通信消息格式如下:

4Byte Header Length Body Length
消息总长度 消息头 消息体

其中, 4字节的消息总长度在NettyDecoder中会被去掉

消息头格式如下:

2Byte 1Byte 2Byte 4Byte 4Byte 4Byte remark length 4Byte extFields length
code language version opaque flag remark length remark content extFields length extFields content

其中:

  • code: 请求命令码(ReqeustCode)或响应命令码(ResponseCode)
  • language: 调用语言实现. 对应LanguageCode中的语言
  • version: 请求或响应的版本号, 对应RemotingCommand.version
  • opaque: 请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用.响应直接返回
  • flag: 通信层标志, 比如标识是否为请求(偶数)还是响应(奇数),单向(2的倍数+2)还是双向
  • remark: 请求的自定义文本信息或响应的错误描述信息
  • extFields: 请求或响应自定义字段

Namesrv

Namesrv在RocketMQ中的作用相较于zk在Kafka中的地位大幅降低, 其并不做master broker挂掉后的选举操作, 以及消息元数据信息的存储, 而仅仅用于对broker的管理(包括主从broker)以及mqadmin工具的命令操作

主要类说明

  • NamesrvStartup: 启动类
  • NamesrvConfig: 配置参数类
  • NettyServerConfig: Netty Server配置类
  • NamesrvController: Namesrv生命周期管理类
  • RouteInfoManager: 路由信息关联类. 这是很重要的一个类,里面包括topicQueueTable,brokerAddrTable,clusterAddrTable,brokerLiveTable,filterServerTable等重要信息
  • DefaultRequestProcessor: 请求处理类

启动命令行参数说明:

-h: 显示使用信息

-p: 打印配置信息到控制台

-c : 使用配置文件中的配置替换NamesrvConfig和NettyServerConfig中的配置

启动流程

  1. 如果命令参数中有'-c',则将配置文件中的配置加载到NamesrvConfig和NettyServerConfig中
  2. 创建并初始化NamesrvController.加载${user.home}/namesrv/kvConfig.json配置文件到KvConfigManager中
  3. 初始化NettyRemotingServer, 注册请求处理器(非集群测试为DefaultRequestProcessor)
  4. 启动定时器,扫描并删除不活动的broker,打印kvConfig
  5. 启动NettyRemotingServer
⚠️ **GitHub.com Fallback** ⚠️