Netty TCP 解包的方法 - housekeeper-software/tech GitHub Wiki
解包和组包方法类举例
设,TCP协议包的包格式如下:
//注意,网络包通常是大端存储的,而netty中的ByteBuf默认都是大端的,所以,我们直接读写即可。
publci class StreamPacket{
public short magic;
public byte type;
public byte reseve;
public short payloadLength;
//表示此包是个心跳包,没有负载,只有包头
public static final int NET_TYPE_KEEP_ALIVE = 1;
//表明是个消息包,一般有负载
public static final int NET_TYPE_MESSAGE = 2;
//如上所示,包头的最小长度为 6个字节
public static final int NET_PACKET_HEADER_LEN = 6;
//这是自定义的幻数
public static final int NET_MAGIC = 0x7363;
private int decodeHeader(ByteBuf in) {
int magic = in.readUnsignedShort();
type = in.readUnsignedByte();
reserve = in.readUnsignedByte();
int payloadLength = in.readUnsignedShort();
if (magic != NET_MAGIC)
return -1;
return payloadLength;
}
...
实现 decode方法:
public static StreamPacket decode(ByteBuf in){
if(in.readableBytes() < NET_PACKET_HEADER_LEN)
return null; //小于最小包长度,需要继续等待更多数据
//我们保存当前的读指标,如果下面发现包并不完整,我们还需要恢复这个指标
int readerIndex = in.readerIndex();
StreamPacket packet = new StreamPacket();
int payloadLength = packet.decodeHeader(in);
if (payloadLength < 0) {
//是因为发现MAGIC没有匹配,说明对端发送错误的包格式,为避免死循环,我们直接将
//接收缓冲区清空
in.skipBytes(in.readableBytes());
return null;
}
if (payloadLength < 1) {
//是个没有payload的消息包,比如心跳包
return packet;
}
if (in.readableBytes() >= payloadLength) {
//缓冲区中剩余的字节满足负载要求的长度,说明,我们至少收到了一个完整的包
//此刻可以将这个包取出
packet.payload = new byte[payloadLength];
in.readBytes(packet.payload);
return packet;
}
//缓冲区虽然收到了包头,但负载还没收完,此刻,我们恢复缓冲区读指标
//以便等到下次收到完整的包之后,可以重头继续解析
in.readerIndex(readerIndex);
return null;
}
//编码发送,这个比较简单
public void encode(ByteBuf buf) {
buf.writeShort(NET_MAGIC);
buf.writeByte(this.type);
buf.writeByte(this.reserve);
if (this.payload == null) {
buf.writeShort(0);
} else {
buf.writeShort(this.payload.length);
buf.writeBytes(payload);
}
}
}
实现解码器
package com.starsoftcomm.transport.tcp;
import com.starsoftcomm.protocol.StreamPacket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class StreamMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
//这里需要循环处理,可能一次收到多个完整的包,这是有可能发生的,所谓的粘包
while (true) {
StreamPacket packet = StreamPacket.decode(in);
if (packet == null)
break; //没有完整包,我们结束本次解码
out.add(packet);
}
}
}
package com.starsoftcomm.transport.tcp;
import com.starsoftcomm.server.ServerContext;
import com.starsoftcomm.transport.common.ChannelUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
public class StreamChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ServerContext serverContext;
public StreamChannelInitializer(ServerContext serverContext) {
this.serverContext = serverContext;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(HAProxyMessageDetector.NAME,new HAProxyMessageDetector());
ch.pipeline().addLast("frameDecoder", new StreamMessageDecoder());
ch.pipeline().addLast("frameEncoder", new StreamMessageEncoder());
ch.pipeline().addLast("idleState", serverContext.newIdleStateHandler());
ch.pipeline().addLast("handler", new StreamServerHandler(serverContext));
}
}
如果在nginx或者Haproxy之后,需要处理haproxy协议
如果我们的应用需要知道对端真实IP的话,需要处理上述协议,并在负载均衡中配置ip forward
注意,这里实现的trydecode,也就是说,如果没有发现haproxy协议头,不影响后续的解码
package com.starsoftcomm.transport.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.ProtocolDetectionResult;
import io.netty.handler.codec.ProtocolDetectionState;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
public class HAProxyMessageDetector extends ChannelInboundHandlerAdapter {
public static final String NAME = HAProxyMessageDetector.class.getSimpleName();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ProtocolDetectionResult<HAProxyProtocolVersion> result =
HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
// should accumulate data if need more data to detect the protocol
if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
return;
}
if (result.state() == ProtocolDetectionState.DETECTED) {
ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder());
ctx.pipeline().remove(this);
}
}
super.channelRead(ctx, msg);
}
}
Handler中的处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
HAProxyMessage message = (HAProxyMessage) msg;
ctx.channel().attr(ChannelUtil.REMOTE_ADDRESS_FROM_PROXY_PROTOCOL).set(InetSocketAddress.createUnresolved(message.sourceAddress(),
message.sourcePort()));
message.release();
return;
}
if (msg instanceof StreamPacket) {
StreamPacket packet = (StreamPacket) msg;
if (packet.getType() == StreamPacket.NET_TYPE_MESSAGE) {
try {
final CooCareMessage.Message m = CooCareMessage.Message.parseFrom(packet.getPayload());
this.serverContext.channelRead(ctx, m);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
ctx.close();
}
} else if (packet.getType() == StreamPacket.NET_TYPE_KEEP_ALIVE) {
ctx.channel().writeAndFlush(packet);
}
}
}