Netty入门(二) 粘包拆包问题以及编解码器的应用

mac2022-06-30  21

一.TCP粘包拆包问题

1.TCP粘包/拆包实例

目的:客户端连续向服务器发送100条时间查询信息,服务器端每接到一条查询信息就客户端返回一条当前系统时间

期望结果:服务器接收到100条独立信息,客户端接收到100条独立应答信息

(1)TimeServerHandler:

public class TimeServerHandler extends ChannelInboundHandlerAdapter{ private int counter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Server Handler is Active!"); counter = 0; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req,"UTF-8").substring(0,req.length - System.getProperty("line.separator").length()); System.out.println("The Time server receive order : " + body + "; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }

注:

(1)System.currentTimeMillis()获取当前系统时间,单位为毫秒,可由Date转换为日期

(2)System.getProperty()是获取系统信息,相关参数如下:

因为换行分隔符在Windows与Linux系统下是不一样的,为了代码更具有拓展性所以使用System.getProperty("line.separator")来获取换行符。

(2)TimeClientHandler:

public class TimeClientHandler extends ChannelInboundHandlerAdapter{ private int counter; private byte[] req; public TimeClientHandler() { counter = 0; req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for(int i = 0;i<100;i++) { ctx.writeAndFlush(Unpooled.copiedBuffer(req)); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req,"UTF-8"); System.out.println("NOW is : " + body + " ; the counter is : " + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }

(3)结果现象:

Server端:

The Time server receive order : QUERY TIME ORDER QUERY TIME ORDER QUERY TIME ORDER ........

QUERY TIME ORD; the counter is : 1 The Time server receive order :  QUERY TIME ORDER QUERY TIME ORDER .........

QUERY TIME ORDER; the counter is : 2

client端:

NOW is : BAD ORDER BAD ORDER  ; the counter is : 1

可以看到,服务器端信息都黏在一起了,客户端也是如此,这种现象严重影响通信功能!

2.TCP粘包/拆包问题原因

  TCP是一个“流”协议,所谓流,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

图解:https://blog.csdn.net/qq_33314107/article/details/80771280

3.TCP粘包/拆包问题解决方法

(1)以换行分割符为信息结束的边界LineBasedFrameDecoder

(1).1 要求:<1>每条传输信息结尾处必须添加系统换行分隔符 : System.getProperty("line.separator")

                   <2>添加LineBasedFrameDecoder 的解码器Handler

(1).2 LineBasedFrameDecoder 解释

构造方法:

public LineBasedFrameDecoder(int maxLength) Creates a new decoder. Parameters: maxLength - the maximum length of the decoded frame. A TooLongFrameException is thrown if the length of the frame exceeds this value. //如果解码器分析超过maxLength字节还没有发现换行分隔符,则抛出异常

 原理:

注:以换行分割符为结束,但是解码字节中不包含换行分隔符! 

(1).3 代码修改以及结果测试

服务器端:

ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(1024));//换行解码器 pipeline.addLast(new StringDecoder());//字符串解码器 pipeline.addLast(new TimeServerHandler()); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //ByteBuf buf = (ByteBuf)msg; //byte[] req = new byte[buf.readableBytes()]; //buf.readBytes(req); String body = (String)msg;//因为添加了StringDecoder System.out.println("The Time server receive order : " + body + "; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); }

客户端类似..........................

结果:

The Time server receive order : QUERY TIME ORDER; the counter is : 1 The Time server receive order : QUERY TIME ORDER; the counter is : 2 The Time server receive order : QUERY TIME ORDER; the counter is : 3 The Time server receive order : QUERY TIME ORDER; the counter is : 4 The Time server receive order : QUERY TIME ORDER; the counter is : 5 The Time server receive order : QUERY TIME ORDER; the counter is : 6 The Time server receive order : QUERY TIME ORDER; the counter is : 7 The Time server receive order : QUERY TIME ORDER; the counter is : 8 The Time server receive order : QUERY TIME ORDER; the counter is : 9 .......................... The Time server receive order : QUERY TIME ORDER; the counter is : 100

客户端:

NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 1 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 2 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 3 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 4 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 5 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 6 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 7 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 8 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 9 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 10 ...................... NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 99 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 100

(2)以自定义特殊分割符为信息结束的边界DelimiterBaseFrameDecoder

(2).1 要求 :<1>每条传输信息结尾处必须添加自定义的特殊分隔符 ,分割符仍不被解码到信息里

                         <2>添加DelimiterBaseFrameDecoder 的解码器InboundHandler

(2).2 DelimiterBaseFrameDecoder解码器解释

构造方法:

public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) Creates a new instance. Parameters: maxFrameLength - the maximum length of the decoded frame. A TooLongFrameException is thrown if the length of the frame exceeds this value. delimiter - the delimiter

(2).3 代码修改

@Override protected void initChannel(SocketChannel arg0) throws Exception { ChannelPipeline pipeline = arg0.pipeline(); ByteBuf buf = Unpooled.copiedBuffer("**".getBytes());//以**分割 pipeline.addLast(new DelimiterBasedFrameDecoder(1024,buf)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new TimeClientHandler()); } 服务器端 : currentTime = currentTime + "**"; 客户端 : req = ("QUERY TIME ORDER" + "**").getBytes();

(3)定长信息解码器FixedLengthFrameDecoder

(3).1 构造器

public FixedLengthFrameDecoder(int frameLength) Creates a new instance. Parameters: frameLength - the length of the frame

一次截取信息的长度:若传送字节小于规定长度则等待下次信息一起,若大于规定长度则截取固定长度。

(3).2 实例(使用Telnet客户端)

Server:

public class MyChannelInitializer extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel arg0) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast(new FixedLengthFrameDecoder(20));//一次截取20字节 pipeline.addLast(new StringDecoder()); pipeline.addLast(new EchoServerHandler()); } }

Handler:

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String)msg; System.out.println("This is " + ++counter + "times receive client : [" + body + "]"); //ctx.writeAndFlush(echo); }

结果:

 

(4)自定义长度消息头编码器 LengthFieldPrepender

(4).1 构造器

public LengthFieldPrepender(int lengthFieldLength) Creates a new instance. Parameters: lengthFieldLength - the length of the prepended length field. Only 1, 2, 3, 4, and 8 are allowed. Throws: java.lang.IllegalArgumentException - if lengthFieldLength is not 1, 2, 3, 4, or 8 //只允许消息头字节数为 1,2,3,4,8个字节,相应代表最大信息字节长度分别为2^8,2^16.... public LengthFieldPrepender(int lengthFieldLength, boolean lengthIncludesLengthFieldLength) Creates a new instance. Parameters: lengthFieldLength - the length of the prepended length field. Only 1, 2, 3, 4, and 8 are allowed. lengthIncludesLengthFieldLength - if true, the length of the prepended length field is added to the value of the prepended length field. Throws: java.lang.IllegalArgumentException - if lengthFieldLength is not 1, 2, 3, 4, or 8 //若为true,则消息头长度将包含长度域

(4).2 功能说明

 

(5)自定义长度消息头解码器LengthFieldBasedFrameDecoder(配合LengthFieldPrepender使用)

(5).1 构造器

public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) Creates a new instance. Parameters: maxFrameLength - the maximum length of the frame. If the length of the frame is greater than this value, TooLongFrameException will be thrown. lengthFieldOffset - the offset of the length field lengthFieldLength - the length of the length field lengthAdjustment - the compensation value to add to the value of the length field initialBytesToStrip - the number of first bytes to strip out from the decoded frame

(5).2 参数解释

 int maxFrameLength :解码数据包的最大字节长度  int lengthFieldOffset :长度域偏移量,指的是长度域位于整个数据包字节数组中的起始下标;  int lengthFieldLength :长度域的自己的字节数长度。  int lengthAdjustment:长度域的偏移量矫正。 如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。长度域内的值 + 偏移量 = 信息长度  int initialBytesToStrip:丢弃的起始字节数。丢弃处于有效数据前面的字节数量。

(5).3 实例(TimeServer改)

初始化器:

@Override protected void initChannel(SocketChannel arg0) throws Exception { ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(65539, 0, 2,0,2)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new TimeClientHandler()); }

加上这个就完了(非常好使用).......

结果正常,不再显示。

 

二.Netty编码与解码

    基于Java提供的对象输入/输出流ObjectInputStream/ObjectOutputStream,我们可以直接把java对象作为可存储的字节数组写入文件,也可以传输到网络上。在我们进行网络传输时,需要把要传输的对象编码为字节数组或者ByteBuffer对象,当远程服务器读到ByteBuffer对象或者字节数组时,再将其解码为java对象。这些被成为Java的编码解码技术。我们比较熟悉的java对象编解码技术有java对象序列化。但是java对象序列化存在着无法跨语言,性能太低,码流太大等缺点,在实际开发中并不能广泛使用。

   由此,我们来学习一种比较主流的序列化编码解码技术和框架---MessagePack 

  其优点:高效,跨语言,码流小,性能高

1.MessagePack 使用环境配置

添加所需jar包:引入javassist.jar与msgpack-0.6.12.jar两个包,然后就可以使用.

(1)Javassist(Java Programming Assistant) 使得操作Java字节码变得简单。它是一个用于在Java中编辑字节码的类库;它使Java程序能够在运行时定义新类,并在JVM加载时修改类文件。

(2)msgpack是MessagePack所依赖的jar包

下载地址:https://pan.baidu.com/s/1woq6S0VfwHL8CgUCSGBaDg 提取码: 5kdp 

 

2.MessagePack  使用方法

(1)使用前提注意点:

<1>记好netty接收和传递信息都是经过ByteBuf进行的,发送出去的消息都必须先变化为byte,再转化为ByteBuf;接收到的ByteBuf都必须转化。

<2>序列化与反序列化是针对对象而言的

<3>序列化与反序列化的实体POJO类必须遵循JavaBean规则,而且必须加上 org.msgpack.annotation.Message 注解:@Message

(2)操作方法

<1>创建序列化与反序列化器MessagePack

MessagePack msgpack = new MessagePack();

<2>主要方法

1.byte[] raw = messagePack.write(class);//序列化指定的对象为字节数组 2.<T> T read(byte[] bytes, Template<T> tmpl);//根据模板将对象反序列化为模板指定对象,同一个 API 解决 3. read(byte[] bytes, Class<T> c);//将字节数组反序列化为指定类对象,c 指定 POJO 类即可

<3>MessagePack内置模板

Msgpack有很多数据模板,比如TString模板和TValue模板。相关模板均在org.msgpack.template.Template类中定义,诸如:TValue、TByte、TShort、TInteger、TLong、TCharacter、TBigInteger、TBigDecimal、TFloat。可以使用相关模板进行编码解码相关类型,不加的话默认为Object对象处理。

 

3.引入MessagePack相关的Netty编码器和解码器

(1)MsgpackEncoder编码器

package io.netty.handler.codec.msgpack; import org.msgpack.MessagePack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MsgpackEncoder extends MessageToByteEncoder<Object>{ @Override protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2) throws Exception { // TODO Auto-generated method stub MessagePack messagePack = new MessagePack(); byte[] raw = messagePack.write(arg1);//将对象编码为字节数组 arg2.writeBytes(raw);//写入ByteBuf,交给MessageToByteEncoder的write写出 } }

继承MessageToByteEncoder类,MessageToByteEncoder类自身实现了write方法,我们只需通过平encoder方法实现自己的编码方法即可。注意MessageToByteEncoder 也是一个OutBoundHandler

源码:https://www.jianshu.com/p/d7a241cc4203

(2)MsgpackDecoder解码器

package io.netty.handler.codec.msgpack; import java.util.List; import org.msgpack.MessagePack; import org.msgpack.template.Template; import org.msgpack.template.Templates; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf>{//InBoundHandler @Override protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception { // TODO Auto-generated method stub final byte[] array; final int length = arg1.readableBytes(); array = new byte[length]; arg1.getBytes(arg1.readerIndex(), array, 0, length);//获取字节数组 MessagePack msgpack = new MessagePack(); arg2.add(msgpack.read(array));//解码对象 } }

4.对象传输Netty框架结构

(1)HandlerInitializer初始化器

package io.netty.workstation; import javax.xml.bind.helpers.ParseConversionEventImpl; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.msgpack.MsgpackDecoder; import io.netty.handler.codec.msgpack.MsgpackEncoder; public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel arg0) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,0,2,0,2));//自定义长度解码器,处理TCP粘包拆包问题 pipeline.addLast("msgpack decoder",new MsgpackDecoder());//对象解码器 pipeline.addLast("frameEncoder", new LengthFieldPrepender(2));//TCP编码器加头长度 pipeline.addLast("msgpack encoder",new MsgpackEncoder());//对象编码器 pipeline.addLast(new EchoServerHandler());//自定义InboundHandler } }

(2)ServerHandler

package io.netty.workstation; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channelhandler is active!"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server receive the msgpack message : " + msg); ctx.writeAndFlush(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

(3)ClientHandler

package io.netty.workstation; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.pojo.UserInfo; public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client is active!"); UserInfo[] infos = UserInfoArray(); for(int i = 0;i<10;i++) { ctx.write(infos[i]); } ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Client receive the msgpack message : " + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } private UserInfo[] UserInfoArray() { UserInfo[] userInfos = new UserInfo[10]; UserInfo userInfo = null; for(int i = 0;i<10;i++) { userInfo = new UserInfo(); userInfo.setAge(i); userInfo.setName("ABCDEFG ---> " + i); userInfos[i] = userInfo; } return userInfos; } }

(4)结果

最新回复(0)