JAVA IO - tenji/ks GitHub Wiki
JAVA IO
IO有内存IO、网络IO和磁盘IO三种,通常我们说的IO指的是后两者(网络IO把磁盘换做网卡即可)。
Terminologies
- 同步IO(Synchronous I/O):A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes.
- 异步IO(Asynchronous I/O):An asynchronous I/O operation does not cause the requesting process to be blocked.
- 阻塞IO(Blocking I/O):
- 非阻塞IO(Non-Blocking I/O):
一、IO模型
对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的Process (or Thread),另一个就是系统内核(Kernel)。当一个read操作发生时,它会经历两个阶段(阶段一由操作系统决定,阶段二由用户程序决定???):
1. 等待Kernel准备好数据 (Waiting for the data to be ready)
2. Process将数据从Kernel拷贝到Process中(Copying the data from the kernel to the process)
1.1 阻塞IO(Blocking IO)
去餐馆吃饭,点一个自己最爱吃的盖浇饭,然后在原地等着一直到盖浇饭做好,自己端到餐桌就餐。这就是典型的阻塞IO。当厨师给你做饭的时候,你需要一直在那里等着。
Netty包的Socket
类中定义的recvFrom
方法:
private static native DatagramSocketAddress recvFrom(
int fd, ByteBuffer buf, int pos, int limit) throws IOException;
阻塞IO模式下,阶段一和阶段二都是Blocked的。
1.2 非阻塞IO(Non-Blocking IO)
接着上面的例子,你每次点完饭就在那里等着,突然有一天你发现自己真傻。于是,你点完之后,就回桌子那里坐着,然后估计差不多了,就问老板饭好了没,如果好了就去端,没好的话就等一会再去问,依次循环直到饭做好。这就是非阻塞IO。
这种方式在编程中对socket设置O_NONBLOCK
即可。但此方式仅仅针对网络IO有效,对磁盘IO并没有作用。因为本地文件IO就没有被认为是阻塞的,我们所说的网络IO的阻塞是因为网路IO有无限阻塞的可能,而本地文件除非是被锁住,否则是不可能无限阻塞的,因此只有锁这种情况下,O_NONBLOCK
才会有作用。而且,磁盘IO时要么数据在内核缓冲区中直接可以返回,要么需要调用物理设备去读取,这时候进程的其他工作都需要等待。因此,后续的IO复用和信号驱动IO对文件IO也是没有意义的。
非阻塞IO模式下,阶段一是Non-Blocked,而阶段二是Blocked的。
1.3 IO多路复用(IO Multiplexing,事件驱动IO)
IO 复用的实现方式目前主要有select
、poll
和epoll
。
IO多路复用模式下,阶段一是Non-Blocked,而阶段二是Blocked的。
1.4 信号驱动IO
1.5 异步IO(Asynchronous I/O)
Linux 上没有,Windows 上对应的是 IOCP。
异步IO模式下,阶段一和阶段二都是Non-Blocked的。
按照上面的定义,之前所述的Blocking IO,Non-Blocking IO,IO multiplexing都属于Synchronous IO。
有人可能会说,Non-Blocking IO并没有被Blocked啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom
这个System Call。Non-Blocking IO在执行recvfrom
这个System Call的时候,如果Kernel的数据没有准备好,这时候不会Block进程。但是,当kernel中数据准备好的时候,recvfrom
会将数据从Kernel拷贝到用户内存中,这个时候进程是被Blocked了,在这段时间内,进程是被Block的。而Asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到Kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被Block。
二、网络编程模型
上文讲述了 UNIX 环境的五种 IO 模型。基于这五种模型,在 Java 中,随着 NIO 和 NIO2.0(AIO) 的引入,一般具有以下几种网络编程模型:
2.1 BIO
BIO 是一个典型的网络编程模型,是通常我们实现一个服务端程序的过程,步骤如下:
- 主线程accept请求阻塞;
- 请求到达,创建新的线程来处理这个套接字,完成对客户端的响应;
- 主线程继续accept下一个请求
这种模型有一个很大的问题是:当客户端连接增多时,服务端创建的线程也会暴涨,系统性能会急剧下降。因此,在此模型的基础上,类似于 Tomcat 的bio connector
,采用的是线程池来避免对于每一个客户端都创建一个线程。有些地方把这种方式叫做伪异步IO(把请求抛到线程池中异步等待处理)。
2.2 NIO
JDK1.4 开始引入了 NIO 类库,这里的 NIO 指的是 Non-blcok IO,主要是使用Selector
多路复用器来实现。Selector
在 Linux 等主流操作系统上是通过epoll
实现的。
- 适用场景:NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4 开始支持。
2.3 AIO
- 适用场景:AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。
2.4 模型对比
三、网络IO的设计模式
3.1 Reactor 模式
主动模式,所谓主动,是指应用程序不断去轮询,问操作系统,IO是否就绪。Linux 下的select/poll/epoll
就属于主动模式,需要应用程序中有个循环,一直去 poll。在这种模式下,实际的IO操作还是应用程序做的。
3.2 Preactor模式
被动模式,你把read/write
全部交给操作系统,实际的IO操作由操作系统完成,完成之后,再 Callback 你的应用程序。Windows 下的 IOCP 就属于这种模式,再比如 C++ Boost 中的 Asio 库,就是典型的 Proactor 模式。
四、epoll 编程模型 - 3个阶段
在 Linux 平台上,Java NIO 就是基于epoll
来实现的。所有基于epoll
的框架,都有3个阶段:
- 注册事件(connect, accept, read, write)
- 轮询IO是否就绪
- 执行实际IO操作。
下面的代码展示了在 Linux 下,用 C语言epoll
编程的基本框架:
// 阶段1: 调用epoll_ctl(xx) 注册事件
for( ; ; )
{
nfds = epoll_wait(epfd,events,20,500); //阶段2:轮询所有的socket
for(i=0;i<nfds;++i) // 处理轮询结果
{
if(events[i].data.fd==listenfd) // accept事件就绪
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); // 阶段3:执行实际的IO操作,accept
ev.data.fd=connfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); // 回到阶段1:重新注册
}
else if( events[i].events&EPOLLIN ) // 读就绪
{
n = read(sockfd, line, MAXLINE)) < 0 // 阶段3:执行实际的io操作
ev.data.ptr = md;
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); // 回到阶段1:重新注册事件
}
else if(events[i].events&EPOLLOUT) // 写就绪
{
struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); // 阶段3: 执行实际的io操作
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); // 回到阶段1,重新注册事件
}
else
{
// 其他的处理
}
}
}
同样, Java NIO 中的Selector
同样有以下3个阶段,下面把Selector
和epoll
的使用做个对比:
Java NIO | epoll | |
---|---|---|
注册 | channel.register(selector, xxx) selectKey.interOps = xxx | epoll_ctr(...) |
轮训 | selector.poll() | epoll_wait(...) |
实际IO操作 | channel.accept channel.read channel.write | accept read write |
可以看到,两者只是写法不同,同样的, 都有这3个阶段。
下面的表格展示了connect, accept, read, write 这4种事件,分别在这3个阶段对应的函数:
下面看一下Kafka client中Selector的核心实现:
@Override
public void poll(long timeout) throws IOException {
clear(); // 清空各种状态
if (hasStagedReceives())
timeout = 0;
long startSelect = time.nanoseconds();
int readyKeys = select(timeout); // 轮询
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0) {
Set<SelectionKey> keys = this.nioSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
if (key.isConnectable()) { // 有连接事件
channel.finishConnect();
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
}
if (channel.isConnected() && !channel.ready())
channel.prepare(); // 这个只有需要安全检查的SSL需求,普通的不加密的channel,prepare()为空实现
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { // 读就绪
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive); // 实际的读动作
}
if (channel.ready() && key.isWritable()) { // 写就绪
Send send = channel.write(); // 实际的写动作
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel); // 出现异常关闭 Channel
this.disconnected.add(channel.id());
}
}
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
五、LT & ET 模式
我们知道,epoll
里面有2种模式:
- LT(水平触发)
- ET(边缘触发)。
水平触发,又叫条件触发;边缘触发,又叫状态触发。这2种到底有什么区别呢?
在这里就要引入socket的“读/写缓冲区”的概念了:
水平触发(条件触发):读缓冲区只要不为空,就一直会触发读事件;写缓冲区只要不满,就一直会触发写事件。这个比较符合编程习惯,也是epoll的缺省模式。
边缘触发(状态触发):读缓冲区的状态,从空转为非空的时候,触发1次;写缓冲区的状态,从满转为非满的时候,触发1次。比如你发送一个大文件,把写缓存区塞满了,之后缓存区可以写了,就会发生一次从满到不满的切换。
通过分析,我们可以看出: 对于LT模式,要避免“写的死循环”问题:写缓冲区为满的概率很小,也就是“写的条件“会一直满足,所以如果你注册了写事件,没有数据要写,但它会一直触发,所以在LT模式下,写完数据,一定要取消写事件;
对应ET模式,要避免“short read”问题:比如你收到100个字节,它触发1次,但你只读到了50个字节,剩下的50个字节不读,它也不会再次触发,此时这个socket就废了。因此在ET模式,一定要把“读缓冲区”的数据读完。
另外一个关于LT和ET的区别是:LT适用于阻塞和非阻塞IO, ET只适用于非阻塞IO。
还有一个说法是ET的性能更高,但编程难度更大,容易出错。到底ET的性能,是不是一定比LT高,这个有待商榷,需要实际的测试数据来说话。
上面说了,epoll
缺省使用的LT模式,而 Java NIO 用的就是epoll
的LT模式。下面就来分析一下 Java NIO 中connect/read/write
事件的处理。
connect 事件的注册:
// Selector
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
if (this.channels.containsKey(id))
throw new IllegalStateException("There is already a connection for id " + id);
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); // 构造channel的时候,注册connect事件
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);
}
connect 事件的取消
// 在上面的poll函数中,connect 事件就绪,也就是指 connect 连接完成,连接建立
if (key.isConnectable()) { // 有连接事件
channel.finishConnect();
...
}
//PlainTransportLayer
public void finishConnect() throws IOException {
socketChannel.finishConnect(); // 调用channel的finishConnect()
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); // 取消connect事件,新加read事件组册
}
read 事件的注册
从上面也可以看出,read 事件的注册和 connect 事件的取消,是同时进行的。
read 事件的取消
因为 read 是要一直监听远程,是否有新数据到来,所以不会取消,一直监听。并且因为是 LT 模式,只要“读缓冲区”有数据,就会一直触发。
write 事件的注册
// Selector
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
// KafkaChannel
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); // 每调用一次 Send,注册一次 Write 事件
}
write 事件的取消
// 上面的 poll 函数里面
if (channel.ready() && key.isWritable()) { // write 事件就绪
Send send = channel.write(); // 在这个 write 里面,取消了 write 事件
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE); // 取消 write 事件
return send.completed();
}
总结一下:
(1)“事件就绪“这个概念,对于不同事件类型,还是有点歧义的
read 事件就绪:这个最好理解,就是远程有新数据到来,需要去 read。这里因为是 LT 模式,只要读缓冲区有数据,会一直触发。
write 事件就绪:这个指什么呢? 其实指本地的 socket 缓冲区有没有满。没有满的话,就会一直触发写事件。所以要避免”写的死循环“问题,写完,要取消写事件。
connect 事件就绪: 指 connect 连接完成
accept 事件就绪:有新的连接进来,调用 accept 处理
(2)不同类型事件,处理方式是不一样的:
connect 事件:注册1次,成功之后,就取消了。有且仅有1次
read 事件:注册之后不取消,一直监听
write 事件: 每调用一次 send,注册1次,send 成功,取消注册