分布式网络通信框架Netty

mac2024-12-15  25

                                     分布式网络通信框架Netty

                                     Netty编(解)码器与网络通信序列机制

                                                                 学习总结

                                                                                                                                                                                                田超凡

                                                                                                                                                                                2019年10月31日

转载请注明原作者

1 序列模式

对象持久化序列模式

把Java对象信息保存到磁盘或者数据库来实现数据持久化。

序列化:把对象转化成字符串

反序列化:把字符串转化为对象

常见的文件存储持久化信息、JSON序列化和反序列化机制一般主要用于对象持久化操作。

网络传输对象数据序列模式

在网络通信机制中,使用网络传输对象数据序列模式来实现客户端和服务器之间进行跨语言、跨平台数据传输,并以统一的形式来约定和规范网络通信数据传输机制。

序列化:客户端把对象转化成字节并发送给服务器

反序列化:服务器把字节转化为对象并进行处理,当需要返回响应的时候,再把对象序列化为字节返回给客户端。

网络传输对象数据序列模式一般采用NIO或者Netty缓冲区ByteBuffer/ByteBuf、MessagePack二进制JSON序列化等形式进行跨平台、跨语言的数据传输。

注意序列化的类必须要实现Serializable接口,transient修饰属性不被序列化。如果使用MessagePack进行二进制JSON序列化时,被序列化的类必须使用@Message注解标注。

 

2 序列模式实现机制

.JSON-轻量级数据交换格式.XML-密文/报文数据交换格式.Google ProtoBuf-缓冲区数据交换机制.MessagePack二进制JSON数据交换格式.内部自定义序列化交换格式

(6).Facebook 的 Thrift - 不常用,不稳定

(7).Jboos 的 Marshaling - 不常用,不稳定

 

Netty编码器和解码器

3.1编码器:把对象转化成字节ButeBuf,实现网络传输对象数据序列化。

3.2解码器:把字节ByteBuf转化为对象,实现网络传输对象数据反序列化。

3.3 MessagePack 二进制JSON高效编(解)码器

它像 JSON,但是更快更小,类似JSON格式的高效二进制数据传输协议,直接以JSON Value(ArrayValue)的形式传递数据,不需要KEY,减少网络带宽,提高传输效率。

MessagePack 是一种高效的二进制序列化格式。它允许您在 JSON 等多种语言之间交换数据,但它更快速更小巧。小整数被编码为单个字节,典型的短字符串除了字符串本身之外只需要一个额外的字节。支持Python、Ruby、Java、C/C++等众多语言。宣称比Google Protocol Buffers还要快4倍。目前最被外界看好是下一代JSON-更加轻量级的数据交换传输格式。

 

Java实现序列化和反序列化

4.1 对象持久化模式

import com.mayikt.entity.MsgEntity;import java.io.*;import java.util.Date;import java.util.UUID;public class Test002 {     public static void main(String[] args) {        String fileName="D:\\codes\\msg";         serialize(fileName);         deserialize(fileName);    }     /**     * 序列化对象到文件     *     * @param fileName     */    public static void serialize(String fileName) {         try {            ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(fileName));             //序列化一个字符串到文件            out.writeObject("序列化的日期是:");             //序列化一个当前日期对象到文件            out.writeObject(new Date());            MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(), "真的!");             //序列化一个MsgEntity对象            out.writeObject(msgEntity);            out.close();        } catch (FileNotFoundException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }     /**     * 反序列化     *     * @param fileName     */    public static void deserialize(String fileName) {         try {             //从本地文件中读取字节 变为对象            ObjectInputStream in = new ObjectInputStream(new FileInputStream(fileName));            String str = (String) in.readObject();             //日期对象            Date date = (Date) in.readObject();             //MsgEntity对象            MsgEntity userInfo = (MsgEntity) in.readObject();            System.out.println(str);            System.out.println(date);            System.out.println(userInfo);        } catch (FileNotFoundException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        } catch (ClassNotFoundException e) {            e.printStackTrace();        }    }}

 

 

public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {     /**     * 服务器解码数据     *     * @param channelHandlerContext     * @param byteBuf     * @param list     * @throws Exception     */    @Override     protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {         //首先获取需要解码的byte数组        final byte[] array;         final int length = byteBuf.readableBytes();        array = new byte[length];        byteBuf.getBytes(byteBuf.readerIndex(), array, 0, length);         //mp的read方法将其反序列化为object对象        MessagePack mp = new MessagePack();        list.add(mp.read(array));    }}

 

 

4.2 JSON序列化和反序列化

public class ServerHandler extends SimpleChannelInboundHandler<String> {     @Override     protected void channelRead0(ChannelHandlerContext channelHandlerContext, String o) throws Exception {        System.out.println("o:" + o);        MsgEntity msgEntity = JSONObject.parseObject(o, MsgEntity.class);        System.out.println("msgEntity:" + msgEntity);    }}

 

ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ClientHandler());

 

public class ClientHandler extends SimpleChannelInboundHandler<String> {     /**     * 活跃通道可以发送消息     *     * @param ctx     * @throws Exception     */    @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {        MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(), "Message");        String json = JSONObject.toJSONString(msgEntity);         // 发送数据        ctx.writeAndFlush(json);    }     /**     * 读取消息     *     * @param channelHandlerContext     * @param s     * @throws Exception     */    @Override     protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {        System.out.println("resp:" + s);    }}

socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new ServerHandler());

 

 

4.3 自定义Netty编(解)码器-MessagePack二进制数据交换格式

Maven的依赖

<!-- MessagePack dependency --><dependency>     <groupId>org.msgpack</groupId>     <artifactId>msgpack</artifactId>     <version>0.6.12</version></dependency><dependency>     <groupId>io.netty</groupId>     <artifactId>netty-all</artifactId>     <version>4.1.32.Final</version></dependency>

 

 

MessagePack基本的API

        // 创建MessagePack        MessagePack messagePack = new MessagePack();        MsgEntity meite = new MsgEntity(UUID.randomUUID().toString(), "Message");         // 序列化        byte[] bs = messagePack.write(meite);        Value read1 = messagePack.read(bs);        System.out.println(read1);//        // 反序列化         MsgEntity read = messagePack.read(bs, MsgEntity.class);        System.out.println(read);    

 

 

MessagePack编码器,实现序列化

public class MsgpackEncoder extends MessageToByteEncoder {     /**     * 对我们数据实现编码     *     * @param channelHandlerContext     * @param msg     * @param byteBuf     * @throws Exception     */    @Override     protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {        MessagePack msgpack = new MessagePack();        byteBuf.writeBytes(msgpack.write(msg));    }}

 

 

MessagePack解码器,实现反序列化

public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {     /**     * 服务器解码数据     *     * @param channelHandlerContext     * @param byteBuf     * @param list     * @throws Exception     */    @Override     protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {         final int length = byteBuf.readableBytes();         byte[] b = new byte[length];        byteBuf.getBytes(byteBuf.readerIndex(), b, 0, length);        MessagePack msgpack = new MessagePack();        list.add(msgpack.read(b));    }}

 

客户端与服务器端

服务器端:

socketChannel.pipeline().addLast(new MsgpackDecoder());socketChannel.pipeline().addLast(new ServerHandler());

 

客户端:

ch.pipeline().addLast(new MsgpackEncoder());ch.pipeline().addLast(new ClientHandler());

补充说明:

在网络传输对象数据序列模式中,一般主要采用MessagePack二进制数据交换格式进行数据传输,因为MessagePack可以灵活地在对象和字节中进行转换,由于TCP连接协议默认网络通信数据传输格式都是字节(ByteBuf),如果使用MessagePack可以自定义编码解码器,这样一来当我们需要传输数据的时候直接就可以传输对象数据而不用每次都必须手动转成字节,相当于把转换的工作从数据传输业务代码中剥离出来,单独形成对应的编码器(对象转换为字节,序列化操作)和解码器(字节转化为对象,反序列化操作),当把自定义的编码和解码器加入到对应Netty客户端和服务器端的事件绑定中即可实现直接传输对象数据(但是实际上对于编译器和网络传输过程而言还是会把对象数据经过我们自定义的编码器和解码器转换成字节进行传输,从而实现跨平台和跨语言,因为MessagePack是一个比JSON更加轻便和高效的基于二进制的数据交换传输格式,所以比较常用,并且在将来微服务架构中大量数据传输时可以在必要时替代JSON格式传输)。需要注意的是,MessagePack在进行反序列化操作时,实际上是把字节转换成了一个MessagePack内定义的一种数组类型(ArrayValue),可以简单理解为类似ArrayList的列表结构,当需要获取原对象信息时,不能直接把MessagePack反序列化的结果强制转换,否则会抛出类型转换异常。这种情况下可以基于Java反射机制读取目标类型中定义的可以被序列化的属性,并通过索引和ArrayValue数组中的数值进行逐一赋值,从而反序列化得到原对象。

转载请注明原作者

最新回复(0)