目的:客户端连续向服务器发送100条时间查询信息,服务器端每接到一条查询信息就客户端返回一条当前系统时间
期望结果:服务器接收到100条独立信息,客户端接收到100条独立应答信息
(1)TimeServerHandler:
public class TimeServerHandler extends ChannelInboundHandlerAdapter{ private int counter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Server Handler is Active!"); counter = 0; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req,"UTF-8").substring(0,req.length - System.getProperty("line.separator").length()); System.out.println("The Time server receive order : " + body + "; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }注:
(1)System.currentTimeMillis()获取当前系统时间,单位为毫秒,可由Date转换为日期
(2)System.getProperty()是获取系统信息,相关参数如下:
因为换行分隔符在Windows与Linux系统下是不一样的,为了代码更具有拓展性所以使用System.getProperty("line.separator")来获取换行符。
(2)TimeClientHandler:
public class TimeClientHandler extends ChannelInboundHandlerAdapter{ private int counter; private byte[] req; public TimeClientHandler() { counter = 0; req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for(int i = 0;i<100;i++) { ctx.writeAndFlush(Unpooled.copiedBuffer(req)); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req,"UTF-8"); System.out.println("NOW is : " + body + " ; the counter is : " + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }(3)结果现象:
Server端:
The Time server receive order : QUERY TIME ORDER QUERY TIME ORDER QUERY TIME ORDER ........
QUERY TIME ORD; the counter is : 1 The Time server receive order : QUERY TIME ORDER QUERY TIME ORDER .........
QUERY TIME ORDER; the counter is : 2
client端:
NOW is : BAD ORDER BAD ORDER ; the counter is : 1
可以看到,服务器端信息都黏在一起了,客户端也是如此,这种现象严重影响通信功能!
TCP是一个“流”协议,所谓流,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
图解:https://blog.csdn.net/qq_33314107/article/details/80771280
(1).1 要求:<1>每条传输信息结尾处必须添加系统换行分隔符 : System.getProperty("line.separator")
<2>添加LineBasedFrameDecoder 的解码器Handler
(1).2 LineBasedFrameDecoder 解释
构造方法:
public LineBasedFrameDecoder(int maxLength) Creates a new decoder. Parameters: maxLength - the maximum length of the decoded frame. A TooLongFrameException is thrown if the length of the frame exceeds this value. //如果解码器分析超过maxLength字节还没有发现换行分隔符,则抛出异常原理:
注:以换行分割符为结束,但是解码字节中不包含换行分隔符!
(1).3 代码修改以及结果测试
服务器端:
ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(1024));//换行解码器 pipeline.addLast(new StringDecoder());//字符串解码器 pipeline.addLast(new TimeServerHandler()); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //ByteBuf buf = (ByteBuf)msg; //byte[] req = new byte[buf.readableBytes()]; //buf.readBytes(req); String body = (String)msg;//因为添加了StringDecoder System.out.println("The Time server receive order : " + body + "; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); }客户端类似..........................
结果:
The Time server receive order : QUERY TIME ORDER; the counter is : 1 The Time server receive order : QUERY TIME ORDER; the counter is : 2 The Time server receive order : QUERY TIME ORDER; the counter is : 3 The Time server receive order : QUERY TIME ORDER; the counter is : 4 The Time server receive order : QUERY TIME ORDER; the counter is : 5 The Time server receive order : QUERY TIME ORDER; the counter is : 6 The Time server receive order : QUERY TIME ORDER; the counter is : 7 The Time server receive order : QUERY TIME ORDER; the counter is : 8 The Time server receive order : QUERY TIME ORDER; the counter is : 9 .......................... The Time server receive order : QUERY TIME ORDER; the counter is : 100
客户端:
NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 1 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 2 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 3 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 4 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 5 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 6 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 7 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 8 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 9 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 10 ...................... NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 99 NOW is : Mon Oct 07 13:42:42 CST 2019 ; the counter is : 100
(2).1 要求 :<1>每条传输信息结尾处必须添加自定义的特殊分隔符 ,分割符仍不被解码到信息里
<2>添加DelimiterBaseFrameDecoder 的解码器InboundHandler
(2).2 DelimiterBaseFrameDecoder解码器解释
构造方法:
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) Creates a new instance. Parameters: maxFrameLength - the maximum length of the decoded frame. A TooLongFrameException is thrown if the length of the frame exceeds this value. delimiter - the delimiter(2).3 代码修改
@Override protected void initChannel(SocketChannel arg0) throws Exception { ChannelPipeline pipeline = arg0.pipeline(); ByteBuf buf = Unpooled.copiedBuffer("**".getBytes());//以**分割 pipeline.addLast(new DelimiterBasedFrameDecoder(1024,buf)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new TimeClientHandler()); } 服务器端 : currentTime = currentTime + "**"; 客户端 : req = ("QUERY TIME ORDER" + "**").getBytes();(3).1 构造器
public FixedLengthFrameDecoder(int frameLength) Creates a new instance. Parameters: frameLength - the length of the frame一次截取信息的长度:若传送字节小于规定长度则等待下次信息一起,若大于规定长度则截取固定长度。
(3).2 实例(使用Telnet客户端)
Server:
public class MyChannelInitializer extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel arg0) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast(new FixedLengthFrameDecoder(20));//一次截取20字节 pipeline.addLast(new StringDecoder()); pipeline.addLast(new EchoServerHandler()); } }Handler:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String)msg; System.out.println("This is " + ++counter + "times receive client : [" + body + "]"); //ctx.writeAndFlush(echo); }结果:
(4).1 构造器
public LengthFieldPrepender(int lengthFieldLength) Creates a new instance. Parameters: lengthFieldLength - the length of the prepended length field. Only 1, 2, 3, 4, and 8 are allowed. Throws: java.lang.IllegalArgumentException - if lengthFieldLength is not 1, 2, 3, 4, or 8 //只允许消息头字节数为 1,2,3,4,8个字节,相应代表最大信息字节长度分别为2^8,2^16.... public LengthFieldPrepender(int lengthFieldLength, boolean lengthIncludesLengthFieldLength) Creates a new instance. Parameters: lengthFieldLength - the length of the prepended length field. Only 1, 2, 3, 4, and 8 are allowed. lengthIncludesLengthFieldLength - if true, the length of the prepended length field is added to the value of the prepended length field. Throws: java.lang.IllegalArgumentException - if lengthFieldLength is not 1, 2, 3, 4, or 8 //若为true,则消息头长度将包含长度域(4).2 功能说明
(5).1 构造器
public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) Creates a new instance. Parameters: maxFrameLength - the maximum length of the frame. If the length of the frame is greater than this value, TooLongFrameException will be thrown. lengthFieldOffset - the offset of the length field lengthFieldLength - the length of the length field lengthAdjustment - the compensation value to add to the value of the length field initialBytesToStrip - the number of first bytes to strip out from the decoded frame(5).2 参数解释
int maxFrameLength :解码数据包的最大字节长度 int lengthFieldOffset :长度域偏移量,指的是长度域位于整个数据包字节数组中的起始下标; int lengthFieldLength :长度域的自己的字节数长度。 int lengthAdjustment:长度域的偏移量矫正。 如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。长度域内的值 + 偏移量 = 信息长度 int initialBytesToStrip:丢弃的起始字节数。丢弃处于有效数据前面的字节数量。
(5).3 实例(TimeServer改)
初始化器:
@Override protected void initChannel(SocketChannel arg0) throws Exception { ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(65539, 0, 2,0,2)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new TimeClientHandler()); }结果正常,不再显示。
基于Java提供的对象输入/输出流ObjectInputStream/ObjectOutputStream,我们可以直接把java对象作为可存储的字节数组写入文件,也可以传输到网络上。在我们进行网络传输时,需要把要传输的对象编码为字节数组或者ByteBuffer对象,当远程服务器读到ByteBuffer对象或者字节数组时,再将其解码为java对象。这些被成为Java的编码解码技术。我们比较熟悉的java对象编解码技术有java对象序列化。但是java对象序列化存在着无法跨语言,性能太低,码流太大等缺点,在实际开发中并不能广泛使用。
由此,我们来学习一种比较主流的序列化编码解码技术和框架---MessagePack
其优点:高效,跨语言,码流小,性能高
1.MessagePack 使用环境配置
添加所需jar包:引入javassist.jar与msgpack-0.6.12.jar两个包,然后就可以使用.
(1)Javassist(Java Programming Assistant) 使得操作Java字节码变得简单。它是一个用于在Java中编辑字节码的类库;它使Java程序能够在运行时定义新类,并在JVM加载时修改类文件。
(2)msgpack是MessagePack所依赖的jar包
下载地址:https://pan.baidu.com/s/1woq6S0VfwHL8CgUCSGBaDg 提取码: 5kdp
2.MessagePack 使用方法
(1)使用前提注意点:
<1>记好netty接收和传递信息都是经过ByteBuf进行的,发送出去的消息都必须先变化为byte,再转化为ByteBuf;接收到的ByteBuf都必须转化。
<2>序列化与反序列化是针对对象而言的
<3>序列化与反序列化的实体POJO类必须遵循JavaBean规则,而且必须加上 org.msgpack.annotation.Message 注解:@Message
(2)操作方法
<1>创建序列化与反序列化器MessagePack
MessagePack msgpack = new MessagePack();<2>主要方法
1.byte[] raw = messagePack.write(class);//序列化指定的对象为字节数组 2.<T> T read(byte[] bytes, Template<T> tmpl);//根据模板将对象反序列化为模板指定对象,同一个 API 解决 3. read(byte[] bytes, Class<T> c);//将字节数组反序列化为指定类对象,c 指定 POJO 类即可<3>MessagePack内置模板
Msgpack有很多数据模板,比如TString模板和TValue模板。相关模板均在org.msgpack.template.Template类中定义,诸如:TValue、TByte、TShort、TInteger、TLong、TCharacter、TBigInteger、TBigDecimal、TFloat。可以使用相关模板进行编码解码相关类型,不加的话默认为Object对象处理。
3.引入MessagePack相关的Netty编码器和解码器
(1)MsgpackEncoder编码器
package io.netty.handler.codec.msgpack; import org.msgpack.MessagePack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MsgpackEncoder extends MessageToByteEncoder<Object>{ @Override protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2) throws Exception { // TODO Auto-generated method stub MessagePack messagePack = new MessagePack(); byte[] raw = messagePack.write(arg1);//将对象编码为字节数组 arg2.writeBytes(raw);//写入ByteBuf,交给MessageToByteEncoder的write写出 } }继承MessageToByteEncoder类,MessageToByteEncoder类自身实现了write方法,我们只需通过平encoder方法实现自己的编码方法即可。注意MessageToByteEncoder 也是一个OutBoundHandler
源码:https://www.jianshu.com/p/d7a241cc4203
(2)MsgpackDecoder解码器
package io.netty.handler.codec.msgpack; import java.util.List; import org.msgpack.MessagePack; import org.msgpack.template.Template; import org.msgpack.template.Templates; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf>{//InBoundHandler @Override protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception { // TODO Auto-generated method stub final byte[] array; final int length = arg1.readableBytes(); array = new byte[length]; arg1.getBytes(arg1.readerIndex(), array, 0, length);//获取字节数组 MessagePack msgpack = new MessagePack(); arg2.add(msgpack.read(array));//解码对象 } }4.对象传输Netty框架结构
(1)HandlerInitializer初始化器
package io.netty.workstation; import javax.xml.bind.helpers.ParseConversionEventImpl; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.msgpack.MsgpackDecoder; import io.netty.handler.codec.msgpack.MsgpackEncoder; public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel arg0) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = arg0.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,0,2,0,2));//自定义长度解码器,处理TCP粘包拆包问题 pipeline.addLast("msgpack decoder",new MsgpackDecoder());//对象解码器 pipeline.addLast("frameEncoder", new LengthFieldPrepender(2));//TCP编码器加头长度 pipeline.addLast("msgpack encoder",new MsgpackEncoder());//对象编码器 pipeline.addLast(new EchoServerHandler());//自定义InboundHandler } }(2)ServerHandler
package io.netty.workstation; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channelhandler is active!"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server receive the msgpack message : " + msg); ctx.writeAndFlush(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }(3)ClientHandler
package io.netty.workstation; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.pojo.UserInfo; public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client is active!"); UserInfo[] infos = UserInfoArray(); for(int i = 0;i<10;i++) { ctx.write(infos[i]); } ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Client receive the msgpack message : " + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } private UserInfo[] UserInfoArray() { UserInfo[] userInfos = new UserInfo[10]; UserInfo userInfo = null; for(int i = 0;i<10;i++) { userInfo = new UserInfo(); userInfo.setAge(i); userInfo.setName("ABCDEFG ---> " + i); userInfos[i] = userInfo; } return userInfos; } }(4)结果