MessagePipe - minbox-projects/message-pipe GitHub Wiki
MessagePipe
是核心类之一,是整个开源项目的灵魂。
该类中封装了向消息管道写入消息、获取消息、批量处理消息等方法,并且对分布式应用场景来说提供了分布式锁(RLock
)的支持,保证了消息的安全性、顺序性、一致性。
1. 管道队列名称
消息管道(MessagePipe
)内部是采用Redisson
的阻塞队列实现的,而阻塞队列在Redis
内是一个List
类型的集合,阻塞队列名称格式:%s.queue
,示例如下所示:
// 消息管道名称
String pipeName = "test";
// 消息管道所绑定的阻塞队列名称
String queueName = LockNames.MESSAGE_QUEUE.format(pipeName);
2. 顺序性
这是阻塞队列的特性,阻塞队列是采用的 先进先出 的方式处理数据的,按照放入队列的顺序来依次处理数据。
3. 安全性
对于单体应用保障 线程安全性 可以使用 线程锁 关键字来完成,而对于分布式来说我们可以借助第三方解决方案实现。
而消息管道(MessagePipe
)内则是采用 Redisson 提供的分布式可重入锁(Reentrant Lock
)来实现,保障在分布式运行环境中只有获取锁(RLock
)的线程才可以执行消息处理方法。
获取分布式锁时,默认最长等待 3秒,如果超过3秒钟还未获取锁会做一些异常处理。
如果获取成功后,因为异常或者其他原因导致了并未调用RLock#unlock
方法声明式解锁,默认会等待 5秒 后自动解锁。
4. 一致性
为了保证消息处理成功,消息管道(MessagePipe
)内的消息在分布式环境下处理一致性,每次在处理消息时会从阻塞队列内获取第一个消息,根据负载均衡策略获取客户端并进行消息转发后,只有执行过程中并未遇到异常并且客户端的反馈执行成功后才会将获取后的第一个消息从阻塞队列内删除。
当然这整个消息消费流程中只有一个线程才拥有处理的权限。
5. 分布式锁名称格式
每个消息管道(MessagePipe
)在创建对象时就会通过LockNames
枚举获取:TakeMessage
、PutMessage
对应锁的名称。
-
TAKE_MESSAGE(
%s.take.lock
):从消息管道内获取消息时的分布式锁名称格式// 消息管道名称 String pipeName = "test"; // 获取消息时的分布式锁名称,test.take.lock String putLockName = LockNames.TAKE_MESSAGE.format(pipeName);
-
PUT_MESSAGE(
%s.put.lock
):写入消息管道内消息的分布式锁名称格式// 消息管道名称 String pipeName = "test"; // 写入消息时的分布式锁名称,test.put.lock String putLockName = LockNames.PUT_MESSAGE.format(pipeName);
6. 线程间协作
消息管道(MessagePipe
)在实例化后会分别在调度器线程池
、监听器线程池
内创建一个子线程,为消息管道完成不同的工作使命。
6.1 消息管道调度器线程
消息管道调度器(MessagePipeScheduler
)主要完成消息的主动调度,而等待新消息产生的方式则是while(true)
的方式,但是为了CPU使用率的优化,所以消息调度器线程一旦创建后就会进入wait
状态。
每当我们调用MessagePipe#putLast
方法时,消息调度器线程才会被唤醒,进行处理调度分发消息到客户端,当处理完消息后会再次进入wait
状态,等待下一次被唤醒。
6.2 消息管道监控器线程
消息管道监控器(MessagePipeMonitor
)主要完成监控消息管道(Message
)内是否还存在未处理的消息,如果存在则会一次性全部进行分发到客户端,在这个分发的过程中消息管道调度器(MessagePipeScheduler
)会进入wait
状态,不会相互干扰。
消息管道监控器默认每间隔:10秒 执行一次,执行完成后就会进入sleep
状态。
7. 异常处理
每当我们调用消息管道(MessagePipe
)内相关处理消息的方法时,如果一旦出现了异常会进行捕获处理,然后交给异常处理器 ExceptionHandler 进行具体异常逻辑处理。