MessagePipeMonitor - minbox-projects/message-pipe GitHub Wiki

消息监控器(MessagePipeMonitor)是为了一次性消息管道(MessagePipe)内剩余消息而添加的概念,默认每间隔 10秒 中执行一次。

启动消息监控器

AbstractMessagePipeManager#createMessagePipe:

MessagePipeMonitor monitor = new MessagePipeMonitor(messagePipe, distributor);
MONITOR_SERVICE.submit(() -> monitor.startup());

创建消息调度器需要消息分发器(MessagePipeDistributor)的实例。

MessagePipeMonitor#startup:

/**
 * Starting message pipe executor monitor
 * <p>
 * Perform message monitoring in the message pipeline at intervals.
 * If there is a message and the time from the last single execution exceeds the threshold,
 * perform all message distribution
 */
public void startup() {
    while (true) {
        try {
            log.debug("MessagePipe:{},starting execution monitor.", messagePipe.getName());
            messagePipe.handleToLast(message -> distributor.sendMessage(message));
            // 线程沉睡一段时间
            Thread.sleep(configuration.getMessagePipeMonitorMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error(e.getMessage(), e);
        }
    }
}

当创建消息管道(MessagePipe)后通过子线程来启动#startup方法,每间隔10秒执行一次MessagePipe#handleToLast方法。

执行MessagePipe#handleToLast方法时,会导致执行MessagePipe#handleFirst方法的子线程会进入wait状态,等待着被其他线程唤醒(notify)。