Reactor Pattern - tenji/ks GitHub Wiki
反应器模式
反应堆软件设计模式是一种事件处理(Event Handling)策略,可以同时响应许多潜在的服务请求。该模式的关键组件是一个事件循环(Event Loop),在单个线程或进程中运行,它对传入请求进行多路分解(demultiplexes)并将它们分派到正确的请求处理程序。
通过依赖基于事件(Event-Based)的机制而不是阻塞 I/O 或多线程(Multi-Threading),反应器可以以最小的延迟处理许多并发 I/O 绑定请求。在服务器端应用程序中需要低延迟和高吞吐量的场景中采用 Reactor 模式,使其成为现代网络框架和 Web 服务器的重要策略。
Dispatcher, Notifier 和 Reactor 的区别是什么?
Reactor 也称为:
- Dispatcher
- Notifier
有哪些开源软件使用了 Reactor 模式?
- Netty:一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端;
- Akka:用于在 JVM 上构建并发、分布式和容错应用程序的工具包和运行时;
- Java NIO (New I/O):提供非阻塞 I/O 操作,允许单个线程管理多个通道(Channels)。
- ...
一、结构
1.1 类图(静态结构)
-
Handle
Handle
(句柄)标识 OS 管理的资源。这些资源一般包括网络连接,打开的文件,定时器,同步对象等。日志服务器中的句柄标识 socket endpoints, 同步事件多路复用器(Synchronous Event Demultiplexer)在句柄上阻塞,等待事件的发生。日志服务关注的两种类型的事件是连接事件以及读事件,分别代表到来的客户端连接以及日志数据。日志服务器为每一个客户端维护一个独立的连接。每个连接由 socket 句柄表示。 -
Synchronous Event Demultiplexer
Synchronous Event Demultiplexer
(同步事件多路复用器)在句柄集合上阻塞等待事件的发生。当其中一个句柄可以无阻塞地发起操作时,同步事件多路复用器从阻塞方法返回。一般的 IO 多路复用器是 select,一个由 UNIX 和 WIN32 提供的事件多路复用的系统调用。select 调用指示哪些句柄可以在不阻塞应用程序进程的情况下对其同步调用操作。 -
Initiation Dispatcher
Initiation Dispatcher
(初始分发器)定义用于注册,取消注册,分发Event Handler
(事件处理器)的接口。根本上,同步事件多路复用器(Synchronous Event Demultiplexer)负责等待新事件的发生。当多路复用器检测到新事件时,通知初始分发器回调事件处理器。通常事件包括连接接受事件(connection acceptance events),数据输入输出事件(data input and output events),以及超时事件(timeout events)。 -
Event Handler 获取句柄(Handle)信息,并完成事件处理。
Event Handler
(事件处理程序)指定一个由钩子方法(hook method)组成的接口,该钩子方法抽象地表示服务特定事件的调度操作。钩子方法必须由应用特定的服务实现。 -
Concrete Event Handler
Concrete Event Handler
(具体事件处理器)实现钩子方法,同样实现应用中处理这些事件的方法。应用将具体事件处理器注册到初始分发器,用以处理特定类型的事件。每当事件到达,初始分发器回调合适的具体事件处理器的钩子方法。
1.2 时序图(动态结构)
Reactor 模式中模块的协作关系如下:
- 应用注册 Concrete Event Handler 到 Initiation Dispatcher, 来向 Initiation Dispatcher 声明 Concrete Event Handler 感兴趣的事件类型, 当在事件处理器关联的 Handle 上有事件发生时, Initiation Dispatcher 通知到 Concrete Event Handler;
- Initiation Dispatcher 请求每个 Event Handler 将其内部 Handle 传回。此 Handle 向操作系统标识 Event Handler;
- 注册所有 Event Handlers 后,应用程序调用 handle_events 来启动 Initiation Dispatcher 的事件循环。此时,Initiation Dispatcher 将来自每个注册的 Event Handler 的 Handle 合并,并使用 Synchronous Event Demultiplexer 等待事件发生在这些 Handles 上。例如,TCP 协议层使用 select 同步事件解复用操作来等待客户端日志记录事件到达连接的套接字 Handle;
- 当对应于事件源的 Handle 变得“就绪”时,Synchronous Event Demultiplexer 通知 Initiation Dispatcher,例如,TCP 套接字“就绪可读取”;
- Initiation Dispatcher 会触发 Event Handler 钩子方法,以响应就绪 Handles 上的事件。当事件发生时,Initiation Dispatcher 使用由事件源激活的 Handles 作为“键”来定位和分派适当的 Event Handler 的钩子方法;
- Initiation Dispatcher 回调 Event Handler 的
handle_event
钩子方法,以执行应用程序特定的功能以响应事件。发生的事件类型可以作为参数传递给方法,并由该方法在内部使用,以执行额外的特定于服务的解复用和调度。
二、样例代码(Reactor Pattern in Java NIO)
Selector (demultiplexer)
Selector 是 Java 的构建块,类似于 Reactor 模式中的多路分配器。你可以通过选择器注册感兴趣的各种 I/O 事件,选择器会告诉你这些事件何时发生(Selector 会不断地轮询注册在其上的 Channel)。
Reactor/initiation dispatcher
Reactor 包含 java.nio.channels.Selector
和注册 handlers 的映射。根据 Dispatcher/Reactor 的定义,Reactor
将在等待 IO 事件发生时调用 Selector.select()
。
/* This is the Initiation Dispatcher or the Reactor */
public class Reactor {
private Map<Integer, EventHandler> registeredHandlers = new ConcurrentHashMap<Integer, EventHandler>();
private Selector demultiplexer;
public Reactor() throws Exception {
demultiplexer = Selector.open();
}
public Selector getDemultiplexer() {
return demultiplexer;
}
public void registerEventHandler(
int eventType, EventHandler eventHandler) {
registeredHandlers.put(eventType, eventHandler);
}
public void registerChannel(
int eventType, SelectableChannel channel) throws Exception {
channel.register(demultiplexer, eventType);
}
public void run() {
try {
// Loop indefinitely
while (true) {
demultiplexer.select();
Set<SelectionKey> readyHandles = demultiplexer.selectedKeys();
Iterator<SelectionKey> handleIterator = readyHandles.iterator();
while (handleIterator.hasNext()) {
SelectionKey handle = handleIterator.next();
if (handle.isAcceptable()) {
EventHandler handler =
registeredHandlers.get(SelectionKey.OP_ACCEPT);
handler.handleEvent(handle);
}
if (handle.isReadable()) {
EventHandler handler =
registeredHandlers.get(SelectionKey.OP_READ);
handler.handleEvent(handle);
handleIterator.remove();
}
if (handle.isWritable()) {
EventHandler handler =
registeredHandlers.get(SelectionKey.OP_WRITE);
handler.handleEvent(handle);
handleIterator.remove();
}
}
}
} catch (Exception e) {
...
}
}
}
Handle
在 Java NIO 范围中,Reactor 模式中的 Handle
是以 SelectionKey
的形式实现的。更多关于 SelectionKey
,传送门
Event
由各种 IO 事件触发的事件,包括:
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
Handler
A handler is often implemented as runnable or callable in Java.
public interface EventHandler {
public void handleEvent(SelectionKey handle) throws Exception;
}
public class AcceptEventHandler implements EventHandler {
private Selector demultiplexer;
public AcceptEventHandler(Selector demultiplexer) {
this.demultiplexer = demultiplexer;
}
public void handleEvent(SelectionKey handle) throws Exception {
System.out.println("===== Accept Event Handler =====");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) handle.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
socketChannel.configureBlocking(false);
socketChannel.register(demultiplexer, SelectionKey.OP_READ);
}
}
}
Client
public class Client {
public static void main(String[] args) throws Exception{
String sentence;
String modifiedSentence;
BufferedReader inFromUser = new BufferedReader( new InputStreamReader(System.in));
Socket clientSocket = new Socket("localhost", 7070);
DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
// BufferedReader inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
// sentence = inFromUser.readLine();
outToServer.writeBytes("Hello NIO Server!" + '\n');
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
sentence = inFromServer.readLine();
System.out.println("Response from Server : " + sentence);
clientSocket.close();
}
}
完整的样例,请参考:https://github.com/kasun04/nio-reactor/tree/master
三、优点
- 性能提升:通过有效处理多个同时连接来提高应用程序性能;
- 减少资源消耗:通过使用少量线程处理大量 I/O 操作来减少资源消耗;
- 增强可伸缩性:通过允许应用程序以最少的线程为许多客户端提供服务来增强可伸缩性。
Reactor 模式为什么可以减少资源消耗?
主要是节省了线程的消耗,因为可以用少量线程处理大量的 I/O 操作,线程是很“贵”的资源,主要体现在:
- 线程的创建和销毁成本很高。在 Linux 这样的操作系统中,线程本质上就是一个进程,创建和销毁都是重量级的系统函数;
- 线程本身占用较大内存。像 Java 的线程栈,一般至少分配 512K ~ 1M 的空间,如果系统中的线程数过千,恐怕整个 JVM 的内存都会被吃掉一半;
- 线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统 load 偏高、CPU sy 使用率特别高(超过 20% 以上),导致系统几乎陷入不可用的状态;
- 容易造成锯齿状的系统负载。因为系统负载是用活动线程数或 CPU 核心数,一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。
四、缺点
- 管理状态和事件处理的复杂性增加;
- 调试和维护异步代码可能具有挑战性;
- 确保线程安全和避免竞争条件方面存在潜在困难。
五、Thread-Based vs Event-Driven
为了处理网络请求,有两种相互竞争的网络架构:基于线程(thread-based)的架构和事件驱动(event-driven)的架构。
5.1 Thread-Based Architecture
实现多线程服务器最直观的方法是遵循每个连接对应一个线程的方法。它适用于连接较少且带宽非常高的站点。
5.2 Event-Driven Architecture
事件驱动方法可以将线程与连接分开,仅将线程用于特定回调或处理程序上的事件。
事件驱动架构由事件创建者(对应的以上的 Handle)和事件消费者(对应的以上的 Event Handler)组成。创建者,即事件的源头,只知道事件已经发生。消费者是需要知道事件已经发生的实体。他们可能参与处理事件,也可能只是受到事件的影响。
单 Reactor 单线程版本
只有一个 Selector 循环接受请求,客户端注册进来由 Reactor 接收注册事件,然后再由 Reactor 分发出去,由对应的 Handler 进行业务逻辑处理。
单线程的问题实际上是很明显的。只要其中一个 Handler 方法阻塞了,那就会导致所有的 client 的 Handler 都被阻塞,也会导致注册事件也无法处理,无法接收新的请求。所以这种模式用的比较少,因为不能充分利用到多核的资源。因此,这种模式仅仅只能处理 Handler 比较快速完成的场景。
单 Reactor 多线程版本
在多线程 Reactor 中,注册接收事件都是由 Reactor 来做,其它的计算,编解码由一个线程池来做。从图中可以看出工作线程是多线程的,监听注册事件的 Reactor 还是单线程。
对比单 Reactor 单线程模型,多线程 Reactor 模式在 Handler 读写处理时,交给工作线程池处理,可以充分利用多核cpu的处理能力,因为 Reactor 分发和 Handler 处理是分开的,不会导致 Reactor 无法执行。从而提升应用的性能。缺点是 Reactor 只在主线程中运行,承担所有事件的监听和响应,如果短时间的高并发场景下,依然会造成性能瓶颈。
多 Reactor 多线程版本
也称为主从 Reactor 模式,在这种模式下,一般会有两个 Reactor:mainReactor
和 subReactor
。mainReactor
负责监听客户端请求,专门处理新连接的建立,再将建立好的连接注册到 subReactor
。subReactor
将分配的连接加入到队列进行监听,当有新的事件发生时,会调用连接相对应的 Handler 进行业务处理。
这样的模型使得每个模块更加专一,耦合度更低,能支持更高的并发量。许多框架也使用这种模式。
关于这两种架构的详细对比、解析和演变,可以查看论文:Scalable IO in Java
Event-Driven 架构和 Reactor 模式是什么关系?
Reactor 模式是事件驱动架构的一种实现技术,事件驱动架构有很多种实现技术,比如 Proactor
模式也是一种实现技术。简而言之,它使用单线程事件循环阻塞监听句柄(Handle)事件并将它们分派给相应的处理程序(Handler)和回调(Callback)。
六、Reactor vs Proactor
Reactor 和 Proactor 模式最大的区别在于 Reactor 是主动模式,基于同步 I/O 的;而 Proactor 是被动模式,基于异步 I/O 的。也就是说,Reactor 模式是在事件发生时就通知事先注册的事件(读写在应用程序线程中处理完成),而 Proactor 是在事件发生时基于异步 I/O 完成读写操作(由内核完成),待 I/O 操作完成后才回调应用程序的处理器来进行业务处理。
关于同步 I/O 和异步 I/O 的差异,查看:JAVA-IO
由于 Proactor 模式编程复杂性高,且操作系统支持不好(Windows 下通过 IOCP 实现了真正的异步 I/O,而在 Linux 系统下,Linux 2.6 才引入,目前异步 I/O 还不完善),因此在 Linux 下实现高并发网络编程都是以 Reactor 模型为主。
Proactor 的结构图:
- 用户线程将
AsynchronousOperation
(读/写等)、Proactor 以及操作完成时的CompletionHandler
注册到AsynchronousOperationProcessor
; AsynchronousOperationProcessor
使用 Facade 模式提供了一组异步操作 API(读/写等)供用户使用,当用户线程调用异步 API 后,便继续执行自己的任务;AsynchronousOperationProcessor
会开启独立的内核线程执行异步操作,实现真正的异步。当异步 IO 操作完成时,AsynchronousOperationProcessor
将用户线程与AsynchronousOperation
一起注册的 Proactor 和CompletionHandler
取出,然后将CompletionHandler
与 IO 操作的结果数据一起转发给 Proactor;- Proactor 负责回调每一个异步操作的事件完成处理函数
handle_event
。
关于两者的详细对比,可以查看课件:Reactor and Proactor
七、反应器模式 vs 观察者模式
观察者模式与 Reactor 模式非常相似,个人认为,Reactor 模式算是观察者模式的扩展。
Reactor 模式下,Handle 可以认为是生产者,Event Handler 可以认为是观察者,观察者模式和 Reactor 模式重要的区别在于,Reactor 模式使用中央请求处理程序(Central Request Handler),也就是 Reactor 来将生产者的消息通知给观察者,而观察者模式则让消费者直接与生产者对话。
从以上样例代码以及结构图可以看出来,如果将 Reactor 作为生产者,Event Handler 作为观察者,那就是一个标准的观察者模式。
更多关于观察者模式,传送门