MessagePipeLoader - minbox-projects/message-pipe GitHub Wiki

如果服务端(Server)因为特殊情况停止了运行,或者写入消息的速度大于分发处理消息的速度,这时消息管道(MessagePipe)阻塞队列内会有消息进行堆积。

当我们需要重启服务端(Server)时MessagePipeLoader就起到了作用,会自动加载Redis内全部的消息管道的阻塞队列,并对应创建消息管道(MessagePipe)。

1. 加载全部的队列

获取项目连接的目标Redis内的全部指定格式化的消息管道(Message)队列列表,队列名称格式为:%s.queue

private static final String ALL_PATTERN = "*";

// 设置Key序列化方式为StringRedisSerializer
redisTemplate.setKeySerializer(new StringRedisSerializer());
// 格式化后的队列名称表达式:*.queue
String allKeyPattern = LockNames.MESSAGE_QUEUE.format(ALL_PATTERN);
// 获取符合表达式的队列列表
Set keySet = redisTemplate.keys(allKeyPattern);

注意事项:通过RedisTemplate#keys方法获取数据时,需要配置KeySerializer为StringRedisSerializer

2. 创建消息管道

Iterator iterator = keySet.iterator();
while (iterator.hasNext()) {
  try {
    String pipeKey = String.valueOf(iterator.next());
    Pattern pipeKeyPattern = Pattern.compile(PIPE_NAME_PATTERN);
    Matcher matcher = pipeKeyPattern.matcher(pipeKey);
    if (matcher.find()) {
      String pipeName = matcher.group(1);
      messagePipeManager.createMessagePipe(pipeName);
    }
  } catch (Exception e) {
    log.error(e.getMessage(), e);
  }
}

根据*.queue表达式匹配的全部队列进行依次创建消息管道(MessagePiep)实例。

我们通过MessagePipeManager#createMessagePipe方法创建消息管道(MessagePipe)后会自动给该管道创建一个消息监听器(MessagePipeMonitor)的子线程,目的就是为了监听消息管道内剩余的消息并做消息分发处理。