Concept Netty LoadBalance - Linyuzai/concept GitHub Wiki
一个服务存在多个实例时,Netty
连接通过网关会被负载均衡连接到其中任意一个实例上。
而当一个服务实例发送消息时,连接另一个实例的客户端就会收不到消息
为了解决这个问题,该库提供了一种解决方案,开箱即用
只需要添加一个配置注解,就可以像单体应用一样使用Netty
也可以通过自定义来支持更复杂的业务
implementation 'com.github.linyuzai:concept-netty-loadbalance-spring-boot-starter:${version}'
<dependency>
<groupId>com.github.linyuzai</groupId>
<artifactId>concept-netty-loadbalance-spring-boot-starter</artifactId>
<version>${version}</version>
</dependency>
在启动类上添加注解@EnableNettyLoadBalanceConcept
启用功能
@EnableNettyLoadBalanceConcept
@SpringBootApplication
public class NettyServiceApplication {
public static void main(String[] args) {
SpringApplication.run(NettyServiceApplication.class, args);
}
}
在Netty
中配置NettyLoadBalanceHandler
来接管连接
@Component
public class NettySampleServer {
@Autowired
private NettyLoadBalanceConcept concept;
public void start(int port) {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//将连接交由 NettyLoadBalanceHandler 管理
pipeline.addLast(new NettyLoadBalanceHandler(concept));
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Throwable e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
注入NettyLoadBalanceConcept
即可跨实例发送消息
@RestController
@RequestMapping("/netty")
public class NettyController {
@Autowired
private NettyLoadBalanceConcept concept;
@RequestMapping("/send")
public void send(@RequestParam String msg) {
concept.send(msg);
}
}
concept:
netty:
server: #服务配置
message:
retry:
times: 0 #客户端重试次数,默认不重试
period: 0 #客户端重试间隔,单位ms,默认0ms
load-balance: #负载均衡(转发)配置
subscriber-master: none #主订阅器,默认无
subscriber-slave: none #从订阅器,默认无
message:
retry:
times: 0 #转发重试次数,默认不重试
period: 0 #转发重试间隔,单位ms,默认0ms
heartbeat: #心跳配置
enabled: true #是否启用心跳,默认true
period: 60000 #心跳间隔,单位ms,默认1分钟
timeout: 210000 #超时时间,单位ms,默认3.5分钟,3次心跳间隔
executor:
thread-pool-size: 1 #线程池大小,默认1
通过Redis
和MQ
等中间件转发消息
类型 | 说明 |
---|---|
Client | 普通客户端 |
Subscriber | 订阅其他的服务消息的连接,该类型连接接收到的消息需要被转发 |
Observable | 其他服务监听自身消息的连接,发送消息时需要转发消息到该类型的连接 |
由于本库支持多种连接(当前包括WebSocket
和Netty
和SSE
)同时配置,所以引入连接域来进行限制。
在自定义组件(所有组件)时需要指定该组件所适配的连接类型(NettyScoped.NAME/WebSocketScoped.NAME/SseScoped.NAME
)
可通过重写boolean support(String scope)
方法或是调用addScopes(String... scopes)
来配置
可以实现NettyEventListener
来监听事件
可以实现NettyLifecycleListener
来监听生命周期(连接发布/连接关闭)
可以实现NettyMessageHandler
来处理消息
可以继承NettyMessageCodecAdapter
来添加消息编解码器
可以配置2种订阅转发方式提高容错
如以Kafka
为主,Redis
为从
当Kafka
转发消息失败后,会切换到Redis
重新转发,并开始对Kafka
进行定时的的心跳检测
等到Kafka
心跳检测正常,则重新切回到Kafka
抛出MessageTransportException
将会触发切换
concept:
netty:
load-balance:
subscriber-master: none #主订阅者器,默认无
subscriber-slave: none #从订阅器,默认无
配置 | 说明 |
---|---|
kafka_topic | kafka转发 |
rabbit_fanout | rabbit转发 |
redis_topic | redis发布订阅 |
redis_topic_reactive | redis发布订阅 |
redisson_topic | redisson发布订阅 |
redisson_topic_reactive | redisson发布订阅 |
redisson_shared_topic | redisson发布订阅 |
redisson_shared_topic_reactive | redisson发布订阅 |
none | 不转发 |
通过Kafka/RabbitMQ
转发消息时可能会重复消费
提供MessageIdempotentVerifier
抽象messageId
生成以及验证是否重复
默认缓存所有messageId
在内存中(存在缓存越来越大的问题,建议自定义使用Redis
或数据库存储)
可通过MessageIdempotentVerifierFactory
自定义并注入Spring
生效
可通过自定义ConnectionSelector
来实现消息的准确发送
如果在复用EventLoopGroup
的情况下,可以配置分组方便管理
在添加NettyLoadBalanceHandler
时传入分组
pipeline.addLast(new NettyLoadBalanceHandler(concept, "group1"));
使用GroupMessage
给某个分组发送消息
@RestController
@RequestMapping("/netty")
public class NettyController {
@Autowired
private NettyLoadBalanceConcept concept;
@RequestMapping("/send-group")
public void sendGroup(@RequestParam String msg) {
concept.send(new GroupMessage(msg, "group1"));
}
}
默认情况下,ConnectionSelector
对于每次发送消息都只会有一个生效(只能生效一种过滤条件)
对于ConnectionSelector
提供扩展FilterConnectionSelector
将会作为一个过滤器来支持多种条件,即组合条件模式
为解决同一个数据多次编码成相同的数据
提供ReusableMessage
来缓存编码后的数据
ObjectMessage message = new ObjectMessage("消息数据");
ReusableMessage reusaeble = message.toReusableMessage();
concept.send(reusaeble);
默认情况下按顺序循环发送
可以注入CompletableFutureMessageSenderFactory
实现并发发送
或是自定义实现(Abstract)MessageSenderFactory
实现NettyMessageHandler
来处理客户端发送的消息
默认配置的编解码器都是转JSON
可以通过NettyMessageCodecAdapter
或(Abstract)MessageCodecAdapter
自定义
所有组件均可自定义扩展(可能需要指定Order
来保证自定义的组件生效)
ConnectionRepository
用于缓存连接实例
默认使用Map<String, Map<Object, Connection>> connections = new ConcurrentHashMap<>();
缓存在内存中
可自定义ConnectionRepositoryFactory
注入容器生效
ConnectionServerManager
用于获取其他服务实例信息(ws
双向连接中使用)和自身服务信息
默认使用DiscoveryClient
和Registration
来获得信息
可自定义ConnectionServerManagerFactory
注入容器生效
ConnectionSubscriber
用于订阅其他服务的消息
提供配置文件配置
可自定义(Abstract)ConnectionSubscriberFactory
或(Abstract)MasterSlaveConnectionSubscriberFactory
注入容器生效
ConnectionFactory
用于扩展Connection
(如WebSocketConnection/NettyConnection/SseConnection
)
可自定义ConnectionFactory
注入容器生效
ConnectionSelector
用于在发送消息时选择发送给哪些连接
可自定义ConnectionSelector
或FilterConnectionSelector
注入容器生效
MessageFactory
用于适配创建消息
可自定义MessageFactory
注入容器生效
MessageCodecAdapter
用于适配各种连接类型的消息编码器和消息解码器
可自定义(Abstract)MessageCodecAdapter
注入容器生效
MessageRetryStrategyAdapter
用于适配各种连接类型的消息重试策略
消息重试不对ping
和pong
生效
可自定义(Abstract)MessageRetryStrategyAdapter
注入容器生效
MessageIdempotentVerifier
用于生成消息ID以及校验消息是否处理
可自定义MessageIdempotentVerifierFactory
注入容器生效
ScheduledExecutor
用于执行各种延时/定时任务,如心跳等
可自定义ScheduledExecutorFactory
注入容器生效
ConnectionLogger
用于打印日志
默认使用Spring
的日志库
可自定义ConnectionLoggerFactory
注入容器生效
ConnectionEventPublisher
用于发布事件
默认支持@EventListener
可自定义ConnectionEventPublisherFactory
注入容器生效
NettyEventListener
用于监听事件
事件 | 说明 |
---|---|
ConnectionLoadBalanceConceptInitializeEvent |
ConnectionLoadBalanceConcept 初始化 |
ConnectionLoadBalanceConceptDestroyEvent |
ConnectionLoadBalanceConcept 销毁 |
ConnectionEstablishEvent |
连接建立 |
ConnectionCloseEvent |
连接关闭 |
ConnectionCloseErrorEvent |
连接关闭异常 |
ConnectionErrorEvent |
连接异常 |
ConnectionSubscribeErrorEvent |
连接订阅异常 |
MessagePrepareEvent |
消息准备 |
MessageSendEvent |
消息发送 |
MessageSendSuccessEvent |
消息发送成功 |
MessageSendErrorEvent |
消息发送异常 |
DeadMessageEvent |
当一个消息不会发送给任何一个连接 |
MessageDecodeErrorEvent |
消息解码异常 |
MessageForwardEvent |
消息转发 |
MessageForwardErrorEvent |
消息转发异常 |
MessageReceiveEvent |
消息接收 |
MessageReceivePredicateErrorEvent |
消息接收断言异常 |
MessageDiscardEvent |
消息丢弃 |
MasterSlaveSwitchEvent |
主从切换 |
MasterSlaveSwitchErrorEvent |
主从切换异常 |
HeartbeatTimeoutEvent |
心跳超时 |
EventPublishErrorEvent |
事件发布异常 |
LoadBalanceMonitorEvent |
监控触发 |
UnknownCloseEvent |
未知的连接关闭 |
UnknownErrorEvent |
未知的连接异常 |
UnknownMessageEvent |
未知的消息 |
使用WebSocketNettyLoadBalanceHandler
代替NettyLoadBalanceHandler
- SpringBoot3 RabbitEndpoint 兼容问题
- 发送对象消息转发变成空字符串的问题
- 支持转发反序列化
- 内存幂等校验添加固定时间清理功能
- 新增 MessageSender 可自定义消息发送策略
- 可复用消息(原PooledMessage)
- 兼容 Spring3 Redis