【框架学习】Netty 网络开发 - hippowc/hippowc.github.io GitHub Wiki
Netty 使用
Channel
以TCP编程为例 ,在java中,有两种方式
- 基于BIO,JDK1.4之前,我们通常使用java.net包中的ServerSocket和Socket来代表服务端和客户端。
- 基于NIO,Jdk1.4引入nio编程之后,我们使用java.nio.channels包中的ServerSocketChannel和SocketChannel来代表服务端与客户端。
在Netty中,对java中的BIO、NIO编程api都进行了封装,分别:
- 使用了OioServerSocketChannel,OioSocketChannel对java.net包中的ServerSocket与Socket进行了封装
- 使用NioServerSocketChannel和NioSocketChannel对java.nio.channels包中的ServerSocketChannel和SocketChannel进行了封装。
虽然Netty使用通道的概念,对java原生BIO、NIO编程api都进行了封装,但是通道的概念其实是在java 1.4之后引入nio编程才出现的,因此只有NioServerSocketChannel和NioSocketChannel才有对应的java通道
ChannelConfig
在Netty中,每种Channel都有对应的配置,用ChannelConfig来表示,ChannelConfig是一个接口,每个特定的Channel实现类都有自己对应的ChannelConfig实现类,如:
- NioSocketChannel的对应的配置类为NioSocketChannelConfig
- NioServerSocketChannel的对应的配置类为NioServerSocketChannelConfig
通常Channel实例,在创建的时候,就会创建其对应的ChannelConfig实例。例如NioServerSocketChannel和NioSocketChannel都是在构造方法中创建了其对应的ChannelConfig实现。
Netty提供了一个ChannelOption类,其定义了ChannelConfig支持的所有参数类型,可以认为ChannelConfig中用了一个Map来保存参数,Map的key是ChannelOption,ChannelConfig 定义了相关方法来获取和修改Map中的值。
ChannelHander
我们经常需要对channel的输入和输出事件进行处理,Netty抽象出一个ChannelHandler概念,专门用于处理此类事件。因为IO事件分为输入和输出,因此ChannelHandler又具体的分为ChannelInboundHandler和ChannelOutboundHandler
对于ChannelHandlerAdapter、ChannelInboundHandlerAdapter 、ChannelOutboundHandlerAdapter,从名字就可以看出来其作用是适配器,适配器是一种设计模式。设想一个接口可能定义很多抽象方法,如果子类直接实现,必定要全部实现这些方法,使得代码很臃肿。由于接口中定义的有些方法是公共的,还有一些方法可能是子类并不关心的,因此通过适配器类,这些方法提供默认的实现。这样的话,在编程的时候,子类只需要覆写自己感兴趣的方法即可。
在使用netty进行编程的时候,对于输入事件的处理,我们应该继承ChannelInboundHandlerAdapter类,而不是直接实现ChannelInboundHandler接口;反之对于输出事件,我们应该继承ChannelOutboundHandlerAdapter类。在处理channel的IO事件时,我们通常会分成几个阶段。以读取数据为例,通常我们的处理顺序是
处理半包或者粘包问题-->数据的解码(或者说是反序列化)-->数据的业务处理
可以看到不同的阶段要执行不同的功能,因此通常我们会编写多个ChannelHandler,来实现不同的功能。而且多个ChannelHandler之间的顺序不能颠倒,例如我们必须先处理粘包解包问题,之后才能进行数据的业务处理。
ChannelPipeline
Netty中通过ChannelPipeline来保证ChannelHandler之间的处理顺序。每一个Channel对象创建的时候,都会自动创建一个关联的ChannelPipeline对象,我们可以通过io.netty.channel.Channel对象的pipeline()方法获取这个对象实例。ChannelPipeline 除了负责配置handler的顺序,还负责在收到读/写事件之后按照顺序调用这些handler。
默认情况下,一个ChannelPipeline实例中,同一个类型ChannelHandler只能被添加一次,如果添加多次,则会抛出异常
ByteBuf使用
网络数据的基本单位总是字节,ByteBuf是Netty提供的一个字节容器,方便对字节数组进行操作(譬如,读、写、扩容等)。Java NIO提供了ByteBuffer作为字节容器实现,但是提供的API太难用了。
ByteBuf的优点
- 它可以被用户自定义的缓冲区类型扩展
- 它可以被用户自定义的缓冲区类型扩展;
- 通过内置的复合缓冲区类型实现了透明的零拷贝;
- 容量可以按需增长(类似于 JDK 的 StringBuilder);
- 在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip()方法;
- 读和写使用了不同的索引;
- 支持方法的链式调用;
- 支持引用计数;
- 支持池化
ByteBuf的创建
通过接口ByteBufAllocator创建ByteBuf。该接口有一个抽象子类和两个实现类,分别对应了用来分配池化的ByteBuf和非池化的ByteBuf。有了Allocator之后,Netty又为我们提供了两个工具类:Pooled、Unpooled,分别用来分配池化的和未池化的ByteBuf,进一步简化了创建ByteBuf的步骤,只需要调用这两个工具类的静态方法即可。
ByteBuf分类
- Pooled和Unpooled:pooled类型的bytebuf是在已经申请好的内存块取一块内存,而Unpooled是直接通过JDK底层代码申请。
- Unsafe和非Unsafe:这里的Unsafe是JDK底层的对象,通过它能够直接操作到内存。
- Heap和Direct:一个是在堆上分配,一个是直接内存。Direct不受GC的控制。
一些API
- buffer:在堆上分配一个ByteBuf
- directBuffer:在堆外分配一个ByteBuf
- wrappedBuffer:将一个byte[]包装成一个ByteBuf后返回
- compositeBuffer:返回一个组合ByteBuf,并指定组合的个数
- copy*开头:调用了buffer(int initialCapacity, int maxCapacity)或directBuffer(int initialCapacity, int maxCapacity)方法,然后将具体的内容write进生成的ByteBuf中返回
ByteBuf一些使用总结
- ByteBuf有读和写两个指针,用来标记“可读”、“可写”、“可丢弃”的字节
- 调用write*方法写入数据后,写指针将会向后移动
- 调用read*方法读取数据后,读指针将会向后移动
- 写入数据或读取数据时会检查是否有足够多的空间可以写入和是否有数据可以读取
- 写入数据之前,会进行容量检查,当剩余可写的容量小于需要写入的容量时,需要执行扩容操作
- 扩容时有一个4MB的阈值,需要扩容的容量小于阈值或大于阈值所对应的扩容逻辑不同
- clear等修改读写指针的方法,只会更改读写指针的值,并不会影响ByteBuf中已有的内容
- setZero等修改字节值的方法,只会修改对应字节的值,不会影响读写指针的值以及字节的可读写状态
TCP粘包、拆包与通信协议
在TCP编程中,通常Sever端与Client通信时的消息都有着固定的消息格式,称之为协议(protocol),例如FTP协议、Telnet协议等,有的公司也会自己开发协议。说白了,协议了就是定义了数据通信的格式。主要是为了解决TCP编程中的粘包和半包问题。
由于TCP(transport control protocol,传输控制协议)是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发往接收端的包,更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样,接收端,就难于分辨出来了,必须提供科学的拆包机制。即面向流的通信是无消息保护边界的
UDP(user datagram protocol,用户数据报协议)是无连接的,面向消息的,提供高效率服务。不会使用块的合并优化算法,, 由于UDP支持的是一对多的模式,所以接收端的skbuff(套接字缓冲区)采用了链式结构来记录每一个到达的UDP包,在每个UDP包中就有了消息头(消息来源地址,端口等信息),这样,对于接收端来说,就容易进行区分处理了。 即面向消息的通信是有消息保护边界的
由于TCP无消息保护边界, 需要在消息接收端处理消息边界问题,也就是我们所说的粘包、拆包问题;而UDP通信则不需要考虑此问题。
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:
- 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
- 服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包
- 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包
- 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。
粘包、拆包发生原因
- socket缓冲区与滑动窗口
- MSS/MTU限制
- Nagle算法
先明确一个概念:每个TCP socket在内核中都有一个发送缓冲区(SO_SNDBUF )和一个接收缓冲区(SO_RCVBUF),TCP的全双工的工作模式以及TCP的滑动窗口便是依赖于这两个独立的buffer以及此buffer的填充状态。SO_SNDBUF和SO_RCVBUF 在windows操作系统中默认情况下都是8K。
- SO_SNDBUF 进程发送的数据的时候(假设调用了一个send方法),最简单情况(也是一般情况),将数据拷贝进入socket的内核发送缓冲区之中,然后send便会在上层返回。换句话说,send返回之时,数据不一定会发送到对端去(和write写文件有点类似),send仅仅是把应用层buffer的数据拷贝进socket的内核发送buffer中。
- SO_RCVBUF 把接受到的数据缓存入内核,应用进程一直没有调用read进行读取的话,此数据会一直缓存在相应socket的接收缓冲区内。再啰嗦一点,不管进程是否读取socket,对端发来的数据都会经由内核接收并且缓存到socket的内核接收缓冲区之中。read所做的工作,就是把内核缓冲区中的数据拷贝到应用层用户的buffer里面,仅此而已。
滑动窗口
TCP链接在三次握手的时候,会将自己的窗口大小(window size)发送给对方,其实就是SO_RCVBUF指定的值。之后在发送数据的时,发送方必须要先确认接收方的窗口没有被填充满,如果没有填满,则可以发送。
假设发送方的每256 bytes表示一个完整的报文,接收方由于数据处理不及时,这256个字节的数据都会被缓存到SO_RCVBUF中。如果接收方的SO_RCVBUF中缓存了多个报文,那么对于接收方而言,这就是粘包。
考虑另外一种情况,假设接收方的window size只剩了128,意味着发送方最多还可以发送128字节,而由于发送方的数据大小是256字节,因此只能发送前128字节,等到接收方ack后,才能发送剩余字节。这就造成了拆包。
粘包、拆包问题的解决方案
这个问题可以通过定义应用的协议(protocol)来解决。协议的作用就定义传输数据的格式。这样在接受到的数据的时候,如果粘包了,就可以根据这个格式来区分不同的包,如果拆包了,就等待数据可以构成一个完整的消息来处理。
目前业界主流的协议(protocol)方案可以归纳如下:
- 定长协议:假设我们规定每3个字节,表示一个有效报文,如果我们分4次总共发送以下9个字节,那么根据协议,我们可以判断出来,这里包含了3个有效的请求报文
- 特殊字符分隔符协议:在包尾部增加回车或者空格符等特殊字符进行分割
- 长度编码:将消息分为消息头和消息体,消息头中用一个int型数据(4字节),表示消息体长度的字段。在解析时,先读取内容长度Length,其值为实际消息体内容(Content)占用的字节数,之后必须读取到这么多字节的内容,才认为是一个完整的数据报文。
总的来说,通信协议就是通信双方约定好的数据格式,发送方按照这个数据格式来发送,接受方按照这个格式来解析。因此发送方和接收方要完成的工作不同,发送方要将发送的数据转换成协议规定的格式,称之为编码(encode);接收方需要根据协议的格式,对二进制数据进行解析,称之为解码(decode)。
Netty中提供大量的工具类,来简化我们的编解码操作
Netty编解码框架
- 编码:发送方要将发送的二进制数据转换成协议规定的格式的二进制数据流,称之为编码(encode),编码功能由编码器(encoder)完成。
- 解码:接收方需要根据协议的格式,对二进制数据进行解析,称之为解码(decode),解码功能由解码器(decoder)完成。
对于开发人员而言,我们要做的工作主要就是2点:确定协议、编写协议对应的编码/解码器。
协议分为公有协议和私有协议。所谓公有协议,指的是业界普遍遵循的通信协议,Netty提供了大量公有协议数据格式的编码解码器,从而简化开发者的使用。
- 邮件服务器,Netty针对POP3、IMAP、SMTP协议的数据格式都提供了相应的编码解码器
- web服务器,Netty提供好了HTTP协议、Websocket协议相应的编解码器。
- redis、memcached这两个缓存服务器,netty都提供了相应的解码
Netty提供了一套完善的编解码框架,入的数据是在ChannelInboundHandler中处理的,数据输出是在ChannelOutboundHandler中处理的。因此编码器/解码器实际上是这两个接口的特殊实现类
ByteToMessageDecoder
用于将接收到的二进制数据(Byte)解码,得到完整的请求报文(Message)。ByteToMessageDecoder提供的一些常见的实现类
- FixedLengthFrameDecoder:定长协议解码器,我们可以指定固定的字节数算一个完整的报文
- LineBasedFrameDecoder:行分隔符解码器,遇到\n或者\r\n,则认为是一个完整的报文
- DelimiterBasedFrameDecoder:分隔符解码器,与LineBasedFrameDecoder类似,只不过分隔符可以自己指定
- LengthFieldBasedFrameDecoder:长度编码解码器,将报文划分为报文头/报文体,根据报文头中的Length字段确定报文体的长度,因此报文提的长度是可变的
- JsonObjectDecoder:json格式解码器,当检测到匹配数量的"{" 、”}”或”[””]”时,则认为是一个完整的json对象或者json数组。
MessageToMessageDecoder
ByteToMessageDecoder是将二进制流进行解码后,得到有效报文。而MessageToMessageDecoder则是将一个本身就包含完整报文信息的对象转换成另一个Java对象。
举例来说,前面介绍了ByteToMessageDecoder的部分子类解码后,会直接将包含了报文完整信息的ByteBuf实例交由之后的ChannelInboundHandler处理,此时,你可以在ChannelPipeline中,再添加一个MessageToMessageDecoder,将ByteBuf中的信息解析后封装到Java对象中,简化之后的ChannelInboundHandler的操作。
MessageToByteEncoder
MessageToByteEncoder也是一个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息转换成二进制流放入ByteBuf中
MessageToMessageEncoder
MessageToMessageEncoder同样是一个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息放到一个List中。子类通过覆写其抽象方法encode,来实现编码
协议如何解决拆包粘包问题
LineBasedFrameDecoder要解决粘包问题,根据"\n"或"\r\n"对二进制数据进行解码,可能会解析出多个完整的请求报文,其会将每个有效报文封装在不同的ByteBuf实例中,然后针对每个ByteBuf实例都会调用一次其他的ChannelInboundHandler的channelRead方法。
此LineBasedFrameDecoder接受到一次数据,其之后的ChannelInboundHandler的channelRead方法可能会被调用多次,且之后的ChannelInboundHandler的channelRead方法接受到的ByteBuf实例参数,包含的都是都是一个完整报文的二进制数据。因此无需再处理粘包问题,只需要将ByteBuf中包含的请求信息解析出来即可,然后进行相应的处理。
注意
有人可能认为调用一个writeAndFlush方法就是发送了一个请求,这是对协议的理解不够深刻。一个完整的请求是由协议规定的,例如我们在这里使用了LineBasedFrameDecoder,潜在的含义就是:一行数据才算一个完整的报文。因此当你调用writeAndFlush方法,如果发送的数据有多个换行符,意味着相当于发送了多次有效请求;而如果发送的数据不包含换行符,意味着你的数据还不足以构成一个有效请求。
UDP 没有拆包粘包的问题
TCP协议是面向流的协议,UDP是面向消息的协议,UDP段都是一条消息,应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据
UDP具有保护消息边界,在每个UDP包中就有了消息头(消息来源地址,端口等信息),这样对于接收端来说就容易进行区分处理了。传输协议把数据当作一条独立的消息在网上传输,接收端只能接收独立的消息。接收端一次只能接收发送端发出的一个数据包,如果一次接受数据的大小小于发送端一次发送的数据大小,就会丢失一部分数据,即使丢失,接受端也不会分两次去接收
Netty 线程框架
netty是被设计用于支持reactor线程模型的 TODO
Netty 使用套路
案例:TimeClient发送“QUERY TIME ORDER”请求,TimeServer接受到这个请求后,返回当前时间。
// 服务端
// 创建了两个EventLoopGroup实例:bossGroup和workerGroup,目前可以将bossGroup和workerGroup理解为两个线程池。其中bossGroup用于接受客户端连接,bossGroup在接受到客户端连接之后,将连接交给workerGroup来进行处理。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap(); // 创建了一个ServerBootstrap实例,从名字上就可以看出来这是一个服务端启动类
b.group(bossGroup, workerGroup) // 需要给设置一些参数,包括创建的bossGroup和workerGroup。
.channel(NioServerSocketChannel.class) // 通过channel方法指定了NioServerSocketChannel,这是netty中表示服务端的类,用于接受客户端连接,对应于java.nio包中的ServerSocketChannel。
.childHandler(new ChannelInitializer<SocketChannel>() { // 通过childHandler方法,设置了一个匿名内部类ChannelInitializer实例,用于初始化客户端连接SocketChannel实例。在接收到客户端连接之后,netty会回调ChannelInitializer的initChannel方法需要对这个连接进行一些初始化工作,主要是告诉netty之后如何处理和响应这个客户端的请求。在这里,主要是添加了3个ChannelHandler
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 在解析客户端请求时,遇到字符”\n”或”\r\n”时则认为是一个完整的请求报文,然后将这个请求报文的二进制字节流交给StringDecoder处理。
ch.pipeline().addLast(new StringDecoder()); // StringDecoder将字节流转换成一个字符串,交给TimeServerHandler来进行处理。
ch.pipeline().addLast(new TimeServerHandler()); // TimeServerHandler是我们自己要编写的类,在这个类中,我们要根据用户请求返回当前时间。
}
});
ChannelFuture f = b.bind(port).sync(); // 调用了ServerBootstrap的bind(port)方法,开启真正的监听在8080端口,接受客户端请求。
// TimeServerHandler
// TimeServerHandler用户处理客户端的请求,每当接收到"QUERY TIME ORDER”请求时,就返回当前时间,否则返回"BAD REQUEST”。
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 继承了ChannelInboundHandlerAdapter,并覆盖了channelRead方法,当客户端发送了请求之后,channelRead方法会被回调。参数ChannelHandlerContext包含了当前发送请求的客户端的一些上下文信息,msg表示客户端发送的请求信息。
String request = (String) msg; // 直接msg强制转换成了String类型。这是因为我们在前面已经添加过了StringDecoder,其已经将二进制流转换成了一个字符串
String response = null;
if ("QUERY TIME ORDER".equals(request)) { // 构建响应。会判断请求是否合法,如果请求信息是"QUERY TIME ORDER”,则返回当前时间,否则返回"BAD REQUEST”
response = new Date(System.currentTimeMillis()).toString();
} else {
response = "BAD REQUEST";
}
response = response + System.getProperty("line.separator”); // 在响应内容中加上了System.getProperty("line.separator”),也就是所谓的换行符。在linux操作系统中,就是”\n”,在windows操作系统是”\r\n”。加上换行符,主要是因为客户端也要对服务端的响应进行解码,当遇到一个换行符时,就认为是一个完整的响应。
ByteBuf resp = Unpooled.copiedBuffer(response.getBytes()); // 调用了Unpooled.copiedBuffer方法创建了一个缓冲区对象ByteBuf。在java nio包中,使用ByteBuffer类来表示一个缓冲区对象。在netty中,使用ByteBuf表示一个缓冲区对象。
ctx.writeAndFlush(resp); // writeAndFlush方法,将响应刷新到客户端
}
}
// 客户端
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap(); // 创建了一个Bootstrap实例,与ServerBootstrap相对应,这表示一个客户端的启动类
b.group(workerGroup); // 调用group方法给Bootstrap实例设置了一个EventLoopGroup实例。前面提到,EventLoopGroup的作用是线程池。前面在创建ServerBootstrap时,设置了一个bossGroup,一个wrokerGroup,这样做主要是为将接受连接和处理连接请求任务划分开,以提升效率。对于客户端而言,则没有这种需求,只需要设置一个EventLoopGroup实例即可。
b.channel(NioSocketChannel.class); // 通过channel方法指定了NioSocketChannel,这是netty在nio编程中用于表示客户端的对象实例。
b.handler(new ChannelInitializer<SocketChannel>() {// 类似server端,在连接创建完成,初始化的时候,我们也给SocketChannel添加了几个处理器类。其中TimeClientHandler是我们自己编写的给服务端发送请求,并接受服务端响应的处理器类。
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // 调用Bootstrap的connect(host, port)方法,与服务端建立连接。
// Wait until the connection is closed.
f.channel().closeFuture().sync();
// 客户端处理类
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private byte[] req=("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
@Override
public void channelActive(ChannelHandlerContext ctx) {// 当客户端与服务端连接建立成功后,channelActive方法会被回调,我们在这个方法中给服务端发送"QUERY TIME ORDER”请求
ByteBuf message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 当接受到服务端响应后,channelRead方法会被会回调,我们在这个方法中打印出响应的时间信息。
String body = (String) msg;
System.out.println("Now is:" + body);
}
}