MessagePipeConfiguration - minbox-projects/message-pipe GitHub Wiki

MessagePipeConfiguration用于配置消息管道(MessagePipe)内部逻辑所需要一些的参数,每个消息管道(MessagePipe)都可以进行单独配置,也可以全局配置。

1. 分布式锁时间

消息管道(MessagePipe)内每次获取、写入消息(Message)时,都会调用Redisson提供的RLock分布式重入锁来实现消息处理的安全性、顺序性。

获取锁以及持有锁时间 是需要进行配置的,当然也有对应的默认值,如果我们不使用默认值,配置方式如下所示:

MessagePipeConfiguration.LockTime lockTime = new MessagePipeConfiguration.LockTime()
  // 配置线程获取等待锁的时长
  .setWaitTime(5)
  // 配置线程持有锁的时长
  .setLeaseTime(10)
  // 配置获取、持有锁的时间单位
  .setTimeUnit(TimeUnit.SECONDS);

1.1 等待获取锁时长

通过LockTime#waitTime字段进行配置每次拿出放入消息(Message)时获取分布式锁的等待时间,单位以LockTime#timeUnit为主,如果超时没有获取到锁,则会进入线程等待状态。

默认为:3秒。

1.2 拥有锁的时长

通过LockTime#leaseTime字段进行配置线程获取锁后,持有的时长,单位以LockTime#timeUnit为主,如果线程处理消息完成后并且等待时间超过了持有时长,则会自动解锁。

默认为:5秒。

1.3 锁相关时间单位

配置线程获取分布式锁等待时间、持有时间的单。

默认为:秒(TimeUnit.SECONDS)。

2. 异常处理器

异常处理器(ExceptionHandler)配置分发消息时遇到异常后的处理方式,默认为:ConsoleExceptionHandler(仅在控制台输出异常堆栈信息)。

详见:ExceptionHandler

3. 客户端负载均衡策略

服务端(Server)在分发消息时会查询消息管道(MessagePipe)所绑定的客户端列表,如果存在多个客户端时会根据配置的负载均衡策略来获取目标分发消息的客户端详情。

负载均衡策略是通过实现ClientLoadBalanceStrategy接口来完成的,默认采用的随机方式(RandomWeightedStrategy)的负载均衡策略。

详见:ClientLoadBalanceStrategy

4. 请求ID生成器

服务端(Server)每次发送消息到客户端(Client)时都会产生一个请求唯一的ID,默认采用 minbox-sequence 方式生成。

如果你需要自定义生成方式,详见:RequestIdGenerator

5. 监控执行间隔时间

每个消息管道在创建后都会创建一个MessagePipeMonitor实例,该实例主要是来监控消息管道内是否有剩余未分发的消息(Message),如果存在则一次性进行分发完毕。

默认为:10秒,执行一次消息监控。

注意事项:消息监控在执行分发时会阻塞调度单个分发消息逻辑,防止出现分发错误问题。

6. 编解码器

由于消息管道(MessagePipe)内部是通过RedissonRBlockingQueue分布式阻塞队列进行实现的,所以消息的写入、获取都是由Redisson进行负责的,而每个消息管道写入消息时的编码方式可能会进行自定义配置。

默认使用:JsonJacksonCodec编码器。