Thingsboard Netty Mqtt 实现分析 - codeHui/IoT-Thingsboard-architecture-and-source-code-analysis GitHub Wiki
1,基础概念理解
- mqtt是应用层协议,基于下层的tcp协议
- 使用Netty是使用NIO,用少量线程管理大量连接,这个连接在这里就是指tcp连接,或者叫socket连接,或者叫Netty中的Channel
- Thingsboard的源码,则部分实现了应用层的mqtt协议(注意是基于自己的业务部分实现了mqtt协议,比如topic都是代码定死的(比如温度计上报温度只能publish到'v1/devices/me/telemetry'这个topic,从而接入Thingsboard的框架),不支持QOS2,再比如每个设备订阅的topic都一样但收到的消息不一致(标准的发布订阅模式,订阅相同的topic应该都到相同的消息才对(thingsboard是用deviceId查到设备的Channel的)),而另一款开源框架EMQX则是声称100%实现了MQTT V3.1.1和V5.0)
2,源码
源码中有两个文件名有mqtt,第一处是netty-mqtt,该处是netty实现的mqtt客户端,用于rule-engine组件以及单元测试中,并不是mqtt服务端的代码
mqtt 服务器(也就是tb-mqtt-transport(微服务模式))启动与设备上报状态消息的代码分析
用@PostConstruct注解初始化,这个类就是标准的Netty服务器启动的代码了。
bossGroupThreadCount核心线程数,workerGroupThreadCount工作线程数,bind(host, port)绑定mqtt服务器端口1883,MqttTransportServerInitializer处理handler继续往下
这个类继承ChannelInitializer,就代表一个设备的网络连接了(mqtt长连接,tcp连接,或者叫socket连接,或者叫Netty中的Channel)
处理收到的设备消息
是否是connect消息
我们选择publish继续
我们选择设备上报的Telemetry状态消息继续(也就是'v1/devices/me/telemetry'这个topic,比如温度计每分钟上报一次温度)
sendToRuleEngine,开始通过消息队列kafka把消息发给rule-engine的微服务
再往下就kafka源码了
3,如何设置代理
这是haproxy的配置,通过用docker自带的域名解析器(resolver)工具,去将tb-mqtt-transport1解析成这个node的ip
这是docker-compose的配置,通过links配置,给haproxy能够查到tb-mqtt-transport1 ip的权限