MessagePipeManager - minbox-projects/message-pipe GitHub Wiki

MessagePipeManager是消息管道(MessagePipe)管理器,提供了创建、获取消息管道实例的方法。

1. AbstractMessagePipeManager

它是MessagePipeManager接口的抽象实现类,提供了封装处理消息管道的全部方法。

2. DefaultMessagePipeManager

抽象消息管道管理器(AbstractMessagePipeManager)的默认实现类,该类在项目启动时会通过@EnableMessagePipeServer注册到Spring Ioc容器内。

3. 新建消息管道

/**
 * 注入消息管道管理对象
 */
@Autowired
private MessagePipeManager manager;

public void test() {
    // 创建消息管道
    MessagePipe messagePipe = manager.createMessagePipe("test");
    Message message = new Message("this is test content".getBytes());
    messagePipe.putLast(message);
}

注意事项:从v1.0.3版本开始,MessagePipeManager#createMessagePipe方法才返回消息管道(MessagePipe)实例。

4. 获取消息管道

public void test() {
    // 获取消息管道
    MessagePipe messagePipe = manager.getMessagePipe("test");
    Message message = new Message("this is test content".getBytes());
    messagePipe.putLast(message);
}

5. 消息管道调度器线程池

AbstractMessagePipeManager抽象类内定义了消息管道调度器线程池(SCHEDULER_SERVICE),线程池的最大线程数量根据 ServerConfiguration#maxMessagePipeCount 配置参数定义。

AbstractMessagePipeManager#afterPropertiesSet:

@Override
public void afterPropertiesSet() throws Exception {
    SCHEDULER_SERVICE = Executors.newFixedThreadPool(serverConfiguration.getMaxMessagePipeCount());   
  // ...省略其他代码
}

AbstractMessagePipeManager#createMessagePipe:

public MessagePipe createMessagePipe(String name) {
    // Create MessagePipe Scheduler
    MessagePipeScheduler scheduler = new MessagePipeScheduler(messagePipe, distributor);
    SCHEDULER_SERVICE.submit(() -> scheduler.startup());
    log.info("MessagePipe:{},scheduler create successfully.", name);
    // ...省略其他代码
}

每当我们通过#createMessagePipe方法创建一个消息管道时,就会通过SCHEDULER_SERVICE线程池提交创建一个子线程来监听调度新消息。

6. 消息管道监控器线程池

AbstractMessagePipeManager抽象类内定义了消息管道监控器线程池(MONITOR_SERVICE),线程池的最大线程数量根据 ServerConfiguration#maxMessagePipeCount 配置参数定义。

AbstractMessagePipeManager#afterPropertiesSet:

@Override
public void afterPropertiesSet() throws Exception {
    MONITOR_SERVICE = Executors.newFixedThreadPool(serverConfiguration.getMaxMessagePipeCount());   
  // ...省略其他代码
}

AbstractMessagePipeManager#createMessagePipe:

public MessagePipe createMessagePipe(String name) {
  // Create MessagePipe Monitor
  MessagePipeMonitor monitor = new MessagePipeMonitor(messagePipe, distributor);
  MONITOR_SERVICE.submit(() -> monitor.startup());
  log.info("MessagePipe:{},monitor create successfully.", name);    
  // ...省略其他代码
}

每当我们通过#createMessagePipe方法创建一个消息管道时,就会通过MONITOR_SERVICE线程池提交创建一个子线程来监听监控消息管道内剩余的消息。