Concept WebSocket LoadBalance 2 - Linyuzai/concept GitHub Wiki

概述

一个服务存在多个实例时,WebSocket通过网关会被负载均衡连接到其中任意一个实例上。

而当一个服务实例发送消息时,连接另一个实例的客户端就会收不到消息

为了解决这个问题,该库提供了一种解决方案,开箱即用

只需要添加一个配置注解,就可以像单体应用一样使用WebSocket

也可以通过自定义来支持更复杂的业务

本库同时兼容WebmvcWebflux,使用方式上没有任何区别

2.x.x 新特性

注意事项:2.x.x版本与1.x.x版本不兼容,如有自定义组件的话升级需要注意

  1. 支持Redis(Redisson)/RabbitMQ/Kafka的订阅转发方式

  2. 订阅转发支持主从配置,当主订阅转发不可用时会自动切换到从订阅,当主订阅转发恢复可用时会切回主订阅

  3. 提供发送重试配置(不过还是推荐使用自带的重试如Kafka/RabbitMQ

最新版本

Maven Central

集成

implementation 'com.github.linyuzai:concept-websocket-loadbalance-spring-boot-starter:${version}'

implementation 'org.springframework.boot:spring-boot-starter-websocket'//webmvc需要添加websocket依赖,webflux不需要
<dependency>
  <groupId>com.github.linyuzai</groupId>
  <artifactId>concept-websocket-loadbalance-spring-boot-starter</artifactId>
  <version>${version}</version>
</dependency>

<!--webmvc需要添加websocket依赖,webflux不需要-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

使用

在启动类上添加注解@EnableWebSocketLoadBalanceConcept启用功能

@EnableWebSocketLoadBalanceConcept
@SpringBootApplication
public class WsServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(WsServiceApplication.class, args);
    }
}

注入WebSocketLoadBalanceConcept即可跨实例发送消息

@RestController
@RequestMapping("/ws")
public class WsController {

    @Autowired
    private WebSocketLoadBalanceConcept concept;

    @RequestMapping("/send")
    public void send(@RequestParam String msg) {
        concept.send(msg);
    }
}

客户端的连接地址为ws(s)://{服务的地址}/concept-websocket/{自定义路径}

其中concept-websocket为默认的前缀,可在配置中自定义

配置属性

concept:
  websocket:
    server: #服务配置
      default-endpoint: #默认端点
        enabled: true #是否启用默认端点,默认true
        prefix: concept-websocket #前缀,默认'/concept-websocket/'
        path-selector: #Path选择器
          enabled: false #是否启用Path选择器,默认false
        user-selector: #User选择器
          enabled: false #是否启用User选择器,默认false
      message:
        retry:
          times: 0 #客户端重试次数,默认不重试
          period: 0 #客户端重试间隔,单位ms,默认0ms
      heartbeat: #心跳配置
        enabled: true #是否启用心跳,默认true
        period: 60000 #心跳间隔,单位ms,默认1分钟
        timeout: 210000 #超时时间,单位ms,默认3.5分钟,3次心跳间隔
    load-balance: #负载均衡(转发)配置
      subscriber-master: websocket #主订阅器,默认 websocket
      subscriber-slave: none #从订阅器,默认无
      message:
        retry:
          times: 0 #转发重试次数,默认不重试
          period: 0 #转发重试间隔,单位ms,默认0ms
      monitor: #监控配置
        enabled: true #是否启用监控,默认true
        period: 30000 #轮训间隔,单位ms,默认30s
        logger: false #是否启用日志,默认false
      heartbeat: #心跳配置
        enabled: true #是否启用心跳,默认true
        period: 60000 #心跳间隔,单位ms,默认1分钟
        timeout: 210000 #超时时间,单位ms,默认3.5分钟,3次心跳间隔
    executor:
      thread-pool-size: 1 #线程池大小,默认1

原理

通过服务间的websocket连接或是RedisMQ等中间件转发消息

连接类型

类型 说明
Client 普通客户端
Subscriber 订阅其他的服务消息的连接,该类型连接接收到的消息需要被转发
Observable 其他服务监听自身消息的连接,发送消息时需要转发消息到该类型的连接

连接域(重要)

由于本库支持多种连接(当前包括WebSocketNettySSE)同时配置,所以引入连接域来进行限制。

在自定义组件(所有组件)时需要指定该组件所适配的连接类型(NettyScoped.NAME/WebSocketScoped.NAME/SseScoped.NAME

可通过重写boolean support(String scope)方法或是调用addScopes(String... scopes)来配置

事件监听器

可以实现WebSocketEventListener来监听事件

生命周期监听器

可以实现WebSocketLifecycleListener来监听生命周期(连接发布/连接关闭)

消息处理器

可以实现WebSocketMessageHandler来处理消息

消息编解码适配器

可以继承WebSocketMessageCodecAdapter来添加消息编解码器

主从订阅

可以配置2种订阅转发方式提高容错

如以Kafka为主,Redis为从

Kafka转发消息失败后,会切换到Redis重新转发,并开始对Kafka进行定时的的心跳检测

等到Kafka心跳检测正常,则重新切回到Kafka

抛出MessageTransportException将会触发切换

配置

concept:
  websocket:
    load-balance:
      subscriber-master: websocket #主订阅者器,默认 websocket
      subscriber-slave: none #从订阅器,默认无

枚举

配置 说明
websocket 每个服务实例ws双向连接,只能配置为主订阅器,且不支持主从切换
websocket_ssl 每个服务实例wss双向连接,只能配置为主订阅器,且不支持主从切换
kafka_topic kafka转发
rabbit_fanout rabbit转发
redis_topic redis发布订阅
redis_topic_reactive redis发布订阅
redisson_topic redisson发布订阅
redisson_topic_reactive redisson发布订阅
redisson_shared_topic redisson发布订阅
redisson_shared_topic_reactive redisson发布订阅
none 不转发

幂等转发

通过Kafka/RabbitMQ转发消息时可能会重复消费

提供MessageIdempotentVerifier抽象messageId生成以及验证是否重复

默认缓存所有messageId在内存中(存在缓存越来越大的问题,建议自定义使用Redis或数据库存储)

可通过MessageIdempotentVerifierFactory自定义并注入Spring生效

消息发送

可通过自定义ConnectionSelector来实现消息的准确发送,如发送给某个路径或带有某个参数的连接

给指定路径的客户端发送消息

假设前端连接的WebSocket地址为ws://localhost:8080/concept-websocket/samplesample为我们自定义路径

在配置中启用路径选择器

concept:
  websocket:
    server: 
      default-endpoint: 
        path-selector: 
          enabled: true #启用Path选择器

使用PathMessage给所有的sample客户端发送消息

@RestController
@RequestMapping("/ws")
public class WsController {

    @Autowired
    private WebSocketLoadBalanceConcept concept;

    @RequestMapping("/send-path")
    public void sendPath(@RequestParam String msg) {
        concept.send(new PathMessage(msg, "sample"));
    }
}

给指定用户发送消息

假设前端连接的WebSocket地址为ws://localhost:8080/concept-websocket/user?userId=1

其中userId为固定参数名

在配置中启用用户选择器

concept:
  websocket:
    server: 
      default-endpoint: 
        user-selector: 
          enabled: true #启用用户选择器

使用UserMessage给指定的用户发送消息

@RestController
@RequestMapping("/ws")
public class WsController {

    @Autowired
    private WebSocketLoadBalanceConcept concept;

    @RequestMapping("/send-user")
    public void sendUser(@RequestParam String msg) {
        concept.send(new UserMessage(msg, "1"));
    }
}

组合条件

如果既有路径条件,又有用户条件

@RestController
@RequestMapping("/ws")
public class WsController {

    @Autowired
    private WebSocketLoadBalanceConcept concept;

    @RequestMapping("/send-path-user")
    public void sendUser(@RequestParam String msg) {
        Message message = new ObjectMessage(msg);
        PathMessage.condition("sample").apply(message);
        UserMessage.condition("1").apply(message);
        concept.send(message);
    }
}

选择过滤器

默认情况下,ConnectionSelector对于每次发送消息都只会有一个生效(只能生效一种过滤条件)

对于ConnectionSelector提供扩展FilterConnectionSelector

将会作为一个过滤器来支持多种条件,即组合条件模式

可复用消息

为解决同一个数据多次编码成相同的数据

提供ReusableMessage来缓存编码后的数据

ObjectMessage message = new ObjectMessage("消息数据");
ReusableMessage reusaeble = message.toReusableMessage();
concept.send(reusaeble);

并发发送消息

默认情况下按顺序循环发送

可以注入CompletableFutureMessageSenderFactory实现并发发送

或是自定义实现(Abstract)MessageSenderFactory

请求拦截

实现WebSocketRequestInterceptor拦截连接

消息接收

实现WebSocketMessageHandler来处理客户端发送的消息

编解码器

默认配置的编解码器都是转JSON

可以通过WebSocketMessageCodecAdapter(Abstract)MessageCodecAdapter自定义

组件说明

所有组件均可自定义扩展(可能需要指定Order来保证自定义的组件生效)

连接仓库

ConnectionRepository用于缓存连接实例

默认使用Map<String, Map<Object, Connection>> connections = new ConcurrentHashMap<>();缓存在内存中

可自定义ConnectionRepositoryFactory注入容器生效

连接服务管理器

ConnectionServerManager用于获取其他服务实例信息(ws双向连接中使用)和自身服务信息

默认使用DiscoveryClientRegistration来获得信息

可自定义ConnectionServerManagerFactory注入容器生效

连接订阅器

ConnectionSubscriber用于订阅其他服务的消息

提供配置文件配置

可自定义(Abstract)ConnectionSubscriberFactory(Abstract)MasterSlaveConnectionSubscriberFactory注入容器生效

连接工厂

ConnectionFactory用于扩展Connection(如WebSocketConnection/NettyConnection/SseConnection

可自定义ConnectionFactory注入容器生效

连接选择器

ConnectionSelector用于在发送消息时选择发送给哪些连接

可自定义ConnectionSelectorFilterConnectionSelector注入容器生效

消息工厂

MessageFactory用于适配创建消息

可自定义MessageFactory注入容器生效

消息编解码适配器

MessageCodecAdapter用于适配各种连接类型的消息编码器和消息解码器

可自定义(Abstract)MessageCodecAdapter注入容器生效

消息重试策略

MessageRetryStrategyAdapter用于适配各种连接类型的消息重试策略

消息重试不对pingpong生效

可自定义(Abstract)MessageRetryStrategyAdapter注入容器生效

消息幂等校验器

MessageIdempotentVerifier用于生成消息ID以及校验消息是否处理

可自定义MessageIdempotentVerifierFactory注入容器生效

执行器

ScheduledExecutor用于执行各种延时/定时任务,如心跳等

可自定义ScheduledExecutorFactory注入容器生效

日志

ConnectionLogger用于打印日志

默认使用Spring的日志库

可自定义ConnectionLoggerFactory注入容器生效

事件发布器

ConnectionEventPublisher用于发布事件

默认支持@EventListener

可自定义ConnectionEventPublisherFactory注入容器生效

事件监听器

WebSocketEventListener用于监听事件

事件

事件 说明
ConnectionLoadBalanceConceptInitializeEvent ConnectionLoadBalanceConcept初始化
ConnectionLoadBalanceConceptDestroyEvent ConnectionLoadBalanceConcept销毁
ConnectionEstablishEvent 连接建立
ConnectionCloseEvent 连接关闭
ConnectionCloseErrorEvent 连接关闭异常
ConnectionErrorEvent 连接异常
ConnectionSubscribeErrorEvent 连接订阅异常
MessagePrepareEvent 消息准备
MessageSendEvent 消息发送
MessageSendSuccessEvent 消息发送成功
MessageSendErrorEvent 消息发送异常
DeadMessageEvent 当一个消息不会发送给任何一个连接
MessageDecodeErrorEvent 消息解码异常
MessageForwardEvent 消息转发
MessageForwardErrorEvent 消息转发异常
MessageReceiveEvent 消息接收
MessageReceivePredicateErrorEvent 消息接收断言异常
MessageDiscardEvent 消息丢弃
MasterSlaveSwitchEvent 主从切换
MasterSlaveSwitchErrorEvent 主从切换异常
HeartbeatTimeoutEvent 心跳超时
EventPublishErrorEvent 事件发布异常
LoadBalanceMonitorEvent 监控触发
UnknownCloseEvent 未知的连接关闭
UnknownErrorEvent 未知的连接异常
UnknownMessageEvent 未知的消息

默认服务端点配置

可以自定义DefaultEndpointCustomizer来配置

Servlet环境下会回调WebSocketHandlerRegistration

Reactive环境下会回调ReactiveWebSocketServerHandlerMapping

WebSocketClientFactory

2.2.0版本针对高版本javax变为jakarta提供Servlet方式的WebSocketClient工厂

版本

2.1.0

  • 关闭原因兼容不同类型

2.2.0

  • 支持二进制数据转发
  • 提供PooledMessage提升编码性能
  • Servlet方式解决javaxjakarta兼容问题

2.3.0

  • SpringBoot3 RabbitEndpoint 兼容问题

2.3.3

  • 发送对象消息转发变成空字符串的问题

2.4.0

  • 修复转发json多次转义的问题
  • 移除编解码器中的过时方法
  • 添加方法:发送消息时可以指定连接选择器

2.6.0

  • 支持转发反序列化
  • 内存幂等校验添加固定时间清理功能
  • 支持自定义实例间订阅的 Endpoint
  • 新增 MessageSender 可自定义消息发送策略
  • javax 的 websocket 实现标记为过时

2.7.0

  • 新增请求拦截
  • 可复用消息(原PooledMessage)
  • 弃用 'concept.websocket.type' 属性

2.7.1

  • 兼容 Spring3 Redis
⚠️ **GitHub.com Fallback** ⚠️