【框架学习】Netty Java NIO 基础 - hippowc/hippowc.github.io GitHub Wiki
Netty是由JBOSS提供的一个开源的java网络编程框架,主要是对java的nio包进行了再次封装。Netty比java原生的nio包提供了更加强大、稳定的功能和易于使用的api。 netty的作者是Trustin Lee,这是一个韩国人,他还开发了另外一个著名的网络编程框架,mina。比较简单易用,类似与web开发领域的springboot
apache storm、hadoop、rocketmq dubbo都在用netty。3.x 目前企业使用最多的版本,最为稳定。例如dubbo使用的就是3.x版本,4.x 引入了内存池等重大特性,可以有效的降低GC负载,rocketmq使用的就是4.x,5.x 已经被废弃了
依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.22.Final</version>
</dependency>
这里所说的IO指的都是网络IO
-
阻塞式I/O:blocking IO
-
非阻塞式I/O: nonblocking IO
-
I/O复用(select,poll,epoll...):IO multiplexing
-
信号驱动式I/O(SIGIO):signal driven IO
-
异步I/O(POSIX的aio_系列函数):asynchronous IO
-
在JDK1.4之前,Java的IO模型只支持阻塞式IO(Blocking IO),简称为BIO
-
在JDK1.4时,支持了I/O多路复用模型,相对于之前的IO模型,这是一个新的模型,所以称之为NIO(New IO)
-
在JDK1.7时,对NIO包进行了升级,支持了异步I/O(Asynchronous IO),简称为AIO
对于一个network IO (以read举例),它会涉及到两个系统对象:一个是调用这个IO的进程,另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:
阶段1:等待数据准备 (Waiting for the data to be ready)
阶段2: 将数据从内核拷贝到进程中
用户空间是常规进程所在区域。 JVM 就是常规进程,驻守于用户空间。用户空间是非特权区域:比如,在该区域执行的代码就不能直接访问硬件设备。
内核空间是操作系统所在区域。内核代码有特别的权力:它能与设备控制器通讯,控制着用户区域进程的运行状态,等等。最重要的是,所有 I/O 都直接(如这里所述)或间接通过内核空间。
当进程要进行IO操作时,会执行系统调用(open,read,write等系统调用),将控制权交给内核,内核会将数据读取到用户控件指定的缓冲区,内核试图对数据进行高速缓存或预读取,因此进程所需数据可能已经在内核空间里了。如果是这样,该数据只需简单地拷贝出来即可。如果数据不在内核空间,则进程被挂起,内核着手把数据读进内存。
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
- 用户进程调用recvfrom,kernel开始准备数据
- kernel等待数据从网络中到达,用户进程会被阻塞。当所有等待数据到达时,它被复制到内核中的某个缓冲区。
- kernel把数据从内核缓冲区复制到应用程序缓冲区。kernel返回结果,解除用户进程的block状态
linux下,可以通过设置socket使其变为non-blocking。
- 用户进程不断调用recvfrom,kernel如果没有准备好数据,不阻塞进程,返回error
- kernel准备好数据,将数据从内核缓冲区复制到应用程序缓冲区
这种IO方式也被称为为event driven IO,IO复用同非阻塞IO本质一样,不过利用了新的select系统调用,由内核来负责本来是请求进程该做的轮询操作。看似比非阻塞IO还多了一个系统调用开销,不过因为可以支持多路IO,才算提高了效率。
- 当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个 socket中的数据准备好了,select就会返回。
- 用户进程再调用read操作,将数据从kernel拷贝到用户进程。
和blocking IO流程没有什么不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),用select的优势在于它可以同时处理多个connection。用户的process其实是一直被 block的。只不过process是被select这个函数block,而不是被socket IO给block。
用的很少,不深究
这类函数的工作机制是告知内核启动某个操作,并让内核在整个操作(包括将数据从内核拷贝到用户空间)完成后通知我们。
- 用户进程发起read操作之后,立刻就可以开始去做其它的事,而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。
- kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都 完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
其实前四种I/O模型都是同步I/O操作,他们的区别在于第一阶段,而他们的第二阶段是一样的:在数据从内核复制到应用缓冲区期间(用户空间),进程阻塞于recvfrom调用。
针对每个client,都创建一个对应的线程来处理,如果client非常多,那么server端就要创建无数个线程来与之对应。而线程数量越多,线程上下文切换(context switch)造成的资源损耗就越大,因此我们需要使用尽可能少的线程。
我们就可以用一个专门的线程去负责第一阶段:这个线程去检查有哪些client准备好了数据,然后将这些client过滤出来,交给worker线程去处理,而worker线程只负责第二阶段:因为第一个阶段已经保证了当前处理的client肯定是有数据的,这样worker线程在读取的时候,阻塞时间是很短的,而不必经历第一阶段那样长时间的等待。
HTTP协议中有keep-alive的概念,TCP协议中也有keep-alive的概念。二者的作用是不同的。
为什么HTTP是短连接?
client向server发送一个request,得到response后,连接就关闭。之所以这样设计使用,主要是考虑到实际情况。例如,用户通过浏览器访问一个web站点上的某个网页,当网页内容加载完毕之后,用户可能需要花费几分钟甚至更多的时间来浏览网页内容,此时完全没有必要继续维持连接。这样做的好处是,可以极大的减轻服务端的压力。一般而言,一个站点能支撑的最大并发连接数也是有限的,面对这么多客户端浏览器,不可能长期维持所有连接。每个客户端取得自己所需的内容后,即关闭连接,更加合理。
为什么要引入keep-alive?
通常一个网页可能会有很多组成部分,除了文本内容,还会有诸如:js、css、图片等静态资源,有时还会异步发起AJAX请求。只有所有的资源都加载完毕后,我们看到网页完整的内容。然而,一个网页中,可能引入了几十个js、css文件,上百张图片,如果每请求一个资源,就创建一个连接,然后关闭,代价实在太大了。
基于此背景,我们希望连接能够在短时间内得到复用,在加载同一个网页中的内容时,尽量的复用连接,这就是HTTP协议中keep-alive属性的作用。
- HTTP 1.0中默认是关闭的,需要在http头加入"Connection: Keep-Alive",才能启用Keep-Alive;
- http 1.1中默认启用Keep-Alive,如果加入"Connection: close ",才关闭。
server端如何处理keep-alive?
客户端只是负责在请求头中设置Keep-Alive。而具体的连接复用时间的长短,通常是由web服务器控制的。这里有个典型的误解,经常听到一些同学会说,通过设置http的keep-alive来保证长连接。通常我们所说的长连接,指的是一个连接创建后,除非出现异常情况,否则从应用启动到关闭期间,连接一直是建立的。例如在RPC框架,如dubbo,服务的消费者在启动后,就会一直维护服务提供者的底层TCP连接。在HTTP协议中,Keep-Alive属性保持连接的时间长短是由服务端决定的,通常配置都是在几十秒左右。
当通过URLConnection.getInputStream()读取响应数据之后(在这里是HttpUrlConnection),应该调用InputStream的close方法关闭输入流,JDK http协议处理器会将这个连接放到一个连接缓存中,以便之后的HTTP请求进行复用。翻译成代码,当发送一次请求,得到响应之后,不是调用HttpURLConnection.disconnect方法关闭,这回导致底层的socket连接被关闭。我们应该通过in.close()关闭,才能进行复用
HTTP协议(四层)的Keep-Alive意图在于连接复用,希望可以短时间内在同一个连接上进行多次请求/响应。举个例子,你搞了一个好项目,想让马云爸爸投资,马爸爸说,"我很忙,最多给你3分钟”,你需要在这三分钟内把所有的事情都说完。核心在于:时间要短,速度要快。
TCP协议(七层)的KeepAlive机制意图在于保活、心跳,检测连接错误。当一个TCP连接两端长时间没有数据传输时(通常默认配置是2小时),发送keepalive探针,探测链接是否存活。例如,我和厮大聊天,开了语音,之后我们各自做自己的事,一边聊天,有一段时间双方都没有讲话,然后一方开口说话,首先问一句,"老哥,你还在吗?”,巴拉巴拉..。又过了一会,再问,"老哥,你还在吗?”。核心在于:虽然频率低,但是持久。
TCP中的SO_KEEPALIVE是一个开关选项,默认关闭,需要在应用程序需要代码中显式的开启。当开启之后,在通信双方没有数据传输时,操作系统底层会定时发送keepalive探测包,以保证连接的存活。一些编程语言支持在代码层面覆盖默认的配置。在使用Java 中,我们可以通过Socket设置keepAlive为true
tcp的keep-alive机制,有一些鸡肋,keepalive只能检测连接是否存活,不能检测连接是否可用。TCP keepalive 机制依赖于操作系统的实现,灵活性不够,默认关闭,且默认的 keepalive 心跳时间是 两个小时, 时间较长。基于此,我们需要加上应用层的心跳。
一个Buffer对象是固定数量的数据的容器。缓冲区的工作与通道紧密联系。对于离开缓冲区的传输,您想传递出去的数据被置于一个缓冲区,被传送到通道。对于传回缓冲区的传输,一个通道将数据放置在您所提供的缓冲区中。
Buffer根据数据类型的不同有很多实现类(IntBuffer、CharBuffer等),先着重关注ByteBuffer,因为在NIO网络编程中,通道直接从ByteBuffer中读取数据。
缓冲区是包在一个对象内的基本数据元素数组。Buffer对象有几个字段:
- 容量( Capacity):缓冲区能够容纳的数据元素的最大数量,可以理解为数组的长度。 这一容量在缓冲区创建时被设定,并且永远不能被改变。
- 上界( Limit):缓冲区的第一个不能被读或写的元素。或者说,缓冲区中现存元素的计数。
- 位置( Position):下一个要被读或写的元素的索引。Buffer类提供了get( )和 put( )函数 来读取或存入数据,position位置会自动进行相应的更新。
- 标记( Mark):一个备忘位置。
一般来说:0 <= mark <= position <= limit <= capacity
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put("Hello".getBytes());
直接字节缓冲区通常是 I/O 操作最好的选择。在设计方面,它们支持 JVM 可用的最高效I/O 机制。非直接字节缓冲区可以被传递给通道,但也是需要先创建临时的非直接缓冲区
直接缓冲区时 I/O 的最佳选择,但可能比创建非直接缓冲区要花费更高的成本。直接缓冲区使用的内存是通过调用本地操作系统方面的代码分配的,绕过了标准 JVM 堆栈。建立和销毁直接缓冲区会明显比具有堆栈的缓冲区更加破费,这取决于主操作系统以及 JVM 实现。直接缓冲区的内存区域不受无用存储单元收集支配,因为它们位于标准 JVM 堆栈之外。
直接 ByteBuffer 是通过调用具有所需容量的 ByteBuffer.allocateDirect()函数产生的,就像我们之前所涉及的 allocate()函数一样。注意用一个 wrap()函数所创建的被包装的缓冲区总是非直接的。所有的缓冲区都提供了一个叫做 isDirect()的 boolean 函数,来测试特定缓冲区是否为直接缓冲区。虽然 ByteBuffer 是唯一可以被直接分配的类型,但如果基础缓冲区是一个直接 ByteBuffer,对于非字节视图缓冲区, isDirect()可以是 true。
回顾我们之前讲解UNIX 五种IO模型中的读取数据的过程,读取数据总是需要通过内核空间传递到用户空间,而往外写数据总是要通过用户空间到内核空间。JVM堆栈属于用户空间。 而我们这里提到的直接缓冲区,就是内核空间的内存。内核空间的内存在java中是通过Unsafe这个类来调用的。
而Netty中所提到的零拷贝(通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间而直接在内核空间中传输到网络的方式),无非就是使用了这里的直接缓冲区。
通道(Channel)可以理解为数据传输的管道。通道与流不同的是,流只是在一个方向上移动(一个流必须是inputStream或者outputStream的子类),而通道可以用于读、写或者同时用于读写。
I/O 可以分为广义的两大类别: File I/O 和 Stream I/O。那么相应地有两种类型的通道,它们是文件( file)通道和套接字( socket)通道。
通道类中,DatagramChannel 和 SocketChannel 实现定义读和写功能的接口而 ServerSocketChannel不实现。 ServerSocketChannel 负责监听传入的连接和创建新的 SocketChannel 对象,它本身从不传输数据。
全部 NIO中的socket 通道类( DatagramChannel、 SocketChannel 和 ServerSocketChannel)在被实例化时都会创建一个对等的BIO中的 socket 对象( Socket、 ServerSocket和 DatagramSocket)。DatagramChannel、 SocketChannel 和 ServerSocketChannel通道类都定义了socket()方法,我们可以通过这个方法获取其关联的socket对象。
通道可以以多种方式创建。 Socket 通道有可以直接创建新 socket 通道的工厂方法。但是一个FileChannel 对象却只能通过在一个打开的 RandomAccessFile、 FileInputStream 或 FileOutputStream对象上调用 getChannel( )方法来获取。您不能直接创建一个 FileChannel 对象。
SocketChannel sc = SocketChannel.open( );
sc.connect (new InetSocketAddress ("somehost", someport));
ServerSocketChannel ssc = ServerSocketChannel.open( );
ssc.socket( ).bind (new InetSocketAddress (somelocalport));
DatagramChannel dc = DatagramChannel.open( );
RandomAccessFile raf = new RandomAccessFile ("somefile", "r");
FileChannel fc = raf.getChannel( );
通道可以以阻塞( blocking)或非阻塞( nonblocking)模式运行。默认情况下,一个通道创建,总是阻塞的,我们可以通过调用configureBlocking(boolean)方法即可
SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false); // nonblocking
ServerSocketChannel 是一个基于通道的 socket 监听器。
同它的对等体 java.net.ServerSocket 一样, ServerSocketChannel 也有 accept( )方法。一旦您创建了一个 ServerSocketChannel 并用对等 socket 绑定了它,然后您就可以在其中一个上调用 accept( )。如果您选择在 ServerSocket 上调用 accept( )方法,那么它会同任何其他的 ServerSocket 表现一样的行为:总是阻塞并返回一个 java.net.Socket 对象。如果您选择在 ServerSocketChannel 上调用 accept( )方法则会返回 SocketChannel 类型的对象,返回的对象能够在非阻塞模式下运行。
如果以非阻塞模式被调用,当没有传入连接在等待时, ServerSocketChannel.accept( )会立即返回 null。
int port = 1234; // default
if (argv.length > 0) {
port = Integer.parseInt(argv[0]);
}
ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(port));
ssc.configureBlocking(false);
while (true) {
System.out.println("Waiting for connections");
SocketChannel sc = ssc.accept();
if (sc == null) {
// no connections, snooze a while
Thread.sleep(2000);
} else {
sc.configureBlocking(false);
ByteBuffer allocate = ByteBuffer.allocateDirect (16 * 1024);
while(sc.read(allocate)>0){
allocate.flip();
while (buffer.hasRemaining( )) {
byte b = buffer.get();
System.out.println(b);
}
allocate.clear();
}
System.out.println("Incoming connection from: "
+ sc.socket().getRemoteSocketAddress());
buffer.rewind();
sc.write(buffer);
sc.close();
}
}
Socket 和 SocketChannel 类封装点对点、有序的网络连接,就是我们所熟知并喜爱的 TCP/IP网络连接。 SocketChannel 扮演客户端发起同一个监听服务器的连接。直到连接成功,它才能收到数据并且只会从连接到的地址接收。每个 SocketChannel 对象创建时都是同一个对等的 java.net.Socket 对象串联的。静态的 open( )方法可以创建一个新的 SocketChannel 对象,而在新创建的 SocketChannel 上调用 socket( )方法能返回它对等的 Socket 对象;在该 Socket 上调用 getChannel( )方法则能返回最初的那个 SocketChannel。
通过open方法
SocketChannel socketChannel =
SocketChannel.open (new InetSocketAddress ("somehost", somePort));
通过connect方法
SocketChannel socketChannel = SocketChannel.open( );
socketChannel.connect (new InetSocketAddress ("somehost", somePort));
我们已经知道,在网络编程中,为了避免频繁的在用户空间与内核空间拷贝数据,通常会直接从内核空间中申请内存,存放数据,在Java中,把内核空间的内存称之为直接内存,nio包中的ByteBuffer的allocateDirect方法
对于直接内存的释放,必须手工调用freeMemory方法,因为JVM只能帮我们管理堆内存,直接内存不在其管理范围之内。
选择器提供选择执行已经就绪的任务的能力,这使得多元 I/O 成为可能,就绪选择和多元执行使得单线程能够有效率地同时管理多个 I/O 通道(channels)。 C/C++代码的工具箱中,许多年前就已经有 select()和 poll()这两个POSIX(可移植性操作系统接口)系统调用可供使用了。许多操作系统也提供相似的功能,但对Java 程序员来说,就绪选择功能直到 JDK 1.4 才成为可行的方案。
获取到SocketChannel之后,直接包装成一个任务,提交给线程池去处理了。而引入了Selector的概念之后, 我们需要将之前创建的一个或多个可选择的Channel注册到Selector对象中,一个键(SelectionKey)将会被返回。SelectionKey 会记住您关心的通道。它们也会追踪对应的通道是否已经就绪。从最基础的层面来看,选择器提供了询问通道是否已经准备好执行每个 I/0 操作的能力。
每个Channel在注册到Selector上的时候,都有一个感兴趣的操作。
- 对于ServerSocketChannel,只会在选择器上注册一个,其感兴趣的操作是ACCEPT,表示其只关心客户端的连接请求
- 对于SocketChannel,通常会注册多个,因为一个server通常会接受到多个client的请求,就有对应数量的SocketChannel。SocketChannel感兴趣的操作是CONNECT、READ、WRITE,因为其要于server建立连接,也需要进行读、写数据。
程序需要主动的去调用Selector.select()方法。 select() 方法会返回一个准备就绪的SelectionKey的集合。通过遍历这些键,您可以选择出每个从上次您调用 select( )开始直到现在,已经就绪的通道。
乍一看,好像只要非阻塞模式就可以模拟就绪检查功能,但实际上还不够。主要的问题是,这种检查不是原子性的。列表中的一个通道都有可能在它被检查之后就绪,但直到下一次轮询为止,您并不会觉察到这种情况。最糟糕的是,您除了不断地遍历列表之外将别无选择。您无法在某个您感兴趣的通道就绪时得到通知。真正的就绪选择必须由操作系统来做。操作系统的一项最重要的功能就是处理 I/O 请求并通知各个线程它们的数据已经准备好了。选择器类提供了这种抽象
Selector 对象是通过调用静态工厂方法 open( )来实例化的。
Selector selector = Selector.open( );
注册通道到选择器上,是通过register方法进行的。通道在被注册到一个选择器上之前,必须先设置为非阻塞模式
ServerSocketChannel ssc=ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost",80));
ssc.configureBlocking(false);
SelectionKey sscSelectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT);//注册ServerSocketChannel
Selector 类的核心是选择过程。这个名词您已经在之前看过多次了——现在应该解释一下了。基本上来说,选择器是对 select( )、 poll( )等本地调用(native call)或者类似的操作系统特定的系统调用的一个包装。但是 Selector 所作的不仅仅是简单地向本地代码传送参数。它对每个选择操作应用了特定的过程。对这个过程的理解是合理地管理键和它们所表示的状态信息的基础。
while (true) {// This may block for a long time. Upon returning, the
// selected set contains keys of the ready channels.
int n = selector.select();
if (n == 0) {//什么情况下会返回0?
continue; // nothing to do
}
// Get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// Look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {//对应SelectionKey.OP_ACCEPT操作
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
registerChannel(selector, channel,SelectionKey.OP_READ);
sayHello(channel);
}
// Is there data to read on this channel?
//对应SelectionKey.OP_READ操作,注意这个key是ServerSocketChannel的SelectionKey
if (key.isReadable()) {
readDataFromSocket(key);
}
// Remove key from selected set; it's been handled
it.remove();
}
}