MessagePipeScheduler - minbox-projects/message-pipe GitHub Wiki

消息调度器(MessagePipeScheduler)生命周期的开始是在创建消息管道(MessagePipe)后。

启动消息调度器

AbstractMessagePipeManager#createMessagePipe:

// 创建消息调度器
MessagePipeScheduler scheduler = new MessagePipeScheduler(messagePipe, distributor);
// 启动消息调度器
SCHEDULER_SERVICE.submit(() -> scheduler.startup());

创建消息调度器需要消息分发器(MessagePipeDistributor)的实例。

MessagePipeScheduler#startup:

/**
 * Start message distribution
 * <p>
 * There is a separate thread to run this method,
 * as long as there is a message that needs to be distributed in the message pipeline,
 * the thread will be awakened, otherwise it will be suspended
 */
public void startup() {
    while (true) {
        try {
            log.debug("MessagePipe:{},starting execution scheduler.", messagePipe.getName());
            messagePipe.handleFirst(message -> distributor.sendMessage(message));
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}

结合上面方法以及查看MessagePipe#handleFirst方法的源码我们不难发现只要消息调度器子线程被唤醒(notify)就会执行MessagePipe#handleFirst方法,当获取到消息阻塞队列内的第一条消息后,就会通过MessagePipeDistrobutor#sendMessage方法进行分发到消息管道绑定的目标客户端。