Netty 通讯协议功能实现
1. 前言
上节内容,我们主要讲解了 Netty 通讯协议设计,其实思路很简单就是核心的四个字段,分别是协议标识符、数据长度、指令、数据。还有其中涉及的技术主要是序列化和反序列化技术以及字节容器。那么本节主要是基于这个思想去实现我们的自定义协议,并且测试客户端循环 1000 遍发送数据是否还会出现粘包和拆包问题。
技术栈说明
- 主要是使用对象流进行序列化和反序列化(ObjectInputStream 和 ObjectOutputStream);
- 字节容器主要是以 Netty 的 ByteBuf 来管理字节。
2. 实现流程
3. 功能实现
3.1 编码实现
实例:
public class MyEncoder extends MessageToByteEncoder<User> {
protected void encode(ChannelHandlerContext channelHandlerContext,
User user,
ByteBuf byteBuf) throws Exception {
//1.创建一个内存输出流
ByteArrayOutputStream os = new ByteArrayOutputStream();
//2.创建一个对象输出流
ObjectOutputStream oos = new ObjectOutputStream(os);
//3.把user对象写到内存流里面
oos.writeObject(user);
//4.通过内存流获取user对象转换后的字节数字
byte[] bytes=os.toByteArray();
//5.关闭流
oos.close();
os.close();
//6.根据协议组装数据
byteBuf.writeInt(1);//标识
byteBuf.writeByte(1);//指令
byteBuf.writeInt(bytes.length);//长度
byteBuf.writeBytes(bytes);//数据内容
}
}
代码说明:
- 自定义一个编码器,把客户端向服务端发送的数据进行加工,主要是转换字节流,然后根据自定义协议来组装数据;
- 标识占用四个字节,使用 writeInt (),一个 int 表示四个字节;
- 指令占用一个字节,因此使用 writeByte () 即可;
- 数据长度占用四个字节,因此使用 writeByte (),int 表示的最大值一般来说足够表示数据的内容了,除非特别特别大的数据(比如:超级大文件的传输)则可以使用 writeLong () 来表示数据长度。
3.2 解码实现
实例:
public class MyDecoder extends ByteToMessageDecoder {
protected void decode(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf, List<Object> list) throws Exception {
//1.根据协议分别取出对应的数据
int tag=byteBuf.readInt();//标识符
byte code=byteBuf.readByte();//指令
int len=byteBuf.readInt();//长度
byte[] bytes=new byte[len];//定义一个字节数据,长度是数据的长度
byteBuf.readBytes(bytes);//往字节数组读取数据
//2.通过对象流来转换字节流,转换成User对象
ByteArrayInputStream is=new ByteArrayInputStream(bytes);
ObjectInputStream iss=new ObjectInputStream(is);
User user=(User)iss.readObject();
is.close();
iss.close();
list.add(user);
}
}
代码说明:
这里主要是实现了解码器,主要目的是通过自定义协议来分别读取对应的数据,并且通过对象流来反序列化字节流。
3.3 发送方 Handler
public class ClientTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<1000;i++){
User user=new User();
user.setName(i+"->zwy");
user.setAge(18);
//注意,这里直接写user对象,无需再手工转换字节流了,编码器会自动帮忙处理。
ctx.channel().writeAndFlush(user);
}
}
}
代码说明:
客户端在链接就绪时,使用 for 循环给服务端发送数据,主要目的是检测是否会产生数据粘包和拆包问题。
3.4 接受方 Handler
实例:
public class ServerTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user=(User)msg;
System.out.println(user.toString());
}
}
3.5 加入 Pipeline
客户端
//1.拆包器
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,5,4)
);
//2.自定义编码器
ch.pipeline().addLast(new MyDecoder());
ch.pipeline().addLast(new MyEncoder());
//3.业务处理Handler
ch.pipeline().addLast(new ClientTestHandler());
服务端
//1.Netty内置拆包器
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,5,4)
);
//2.自定义解码器
ch.pipeline().addLast(new MyDecoder());
ch.pipeline().addLast(new MyEncoder());
//3.业务Handler
ch.pipeline().addLast(new ServerTestHandler());
代码说明:
- 需要往双向链表里面加入三个特殊的 Handler,分别是
LengthFieldBasedFrameDecoder
和自定义的编码器、解码器; LengthFieldBasedFrameDecoder
拆包器的构造函数字段说明,分别如下所示:
2.1 第一个参数,maxFrameLength:解码时,处理每个帧数据的最大长度,一般来说直接赋予Integer.MAX_VALUE
即可;
2.2 第二个参数,lengthFieldOffset :存放帧数据的长度数据的起始位(偏移位),通俗点说,就是表示数据长度的字段在整个协议里面所处的位置,由于协议的结果是:协议标识(4 个字节)、指令(1 个字节)、数据长度(4 个字节),因此数据长度处于第 5 个位置;
2.3 第三个参数,lengthFieldLength:长度属性的长度,即存放整个大数据包长度的字节所占的长度,这里是 4 个字节。
疑问:为什么需要加
LengthFieldBasedFrameDecoder
呢?回答:自定义协议它是无法知道数据包是什么时候应该结束,需要依赖 Netty 提供的拆包器。
3.6 运行效果
先启动服务端,然后启动客户端,打印结果没有出现粘包和拆包问题,证明我们自定义的协议有效,最终运行效果如下所示:
4. LengthFieldBaseFrameDecoder
这里,主要简单的介绍该拆包器,因为它是我们平时开发当中最常用的拆包器, 几乎所有和长度相关的二进制协议都可以通过它来实现,因此在这里简单的介绍一下它的原理。
思考:如果让我们简单实现一个自己的拆包器,那么我们应该如何去实现呢?
其实原理很简单,就是不断从 TCP 缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包。
- 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 tcp 缓冲区中读取,直到得到一个完整的数据包;
- 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到下一个节点进行处理。如果拼接完一个数据包时还有多余的数据仍然保留,以便和下次读到的数据进行拼接;
- Netty 中的拆包也是如上这个原理,内部会有一个累加器,每次读取到数据都会不断累加,然后尝试对累加到的数据进行拆包,拆成一个完整的业务数据包,这个基类叫做
ByteToMessageDecoder
。
5. 小结
本节,主要是根据上节设计的通讯协议来具体的实现效果,主要掌握的核心步骤是:
- 需要依赖
LengthFieldBaseFrameDecoder
拆包器,并且需要了解该拆包器的参数定义和大概原理; - 掌握编码器和解码器的实现,主要是在编码器和解码器里面实现协议的数据粘包和数据拆包。
最新评论
Spring Cloud Alibaba 微服务架构实战 https://pan.baidu.com/s/1jF5voFRoeF0lYAzAPBWSbw?pwd=chqk
命令: nload
真是个良心站点哇,大公无私,爱了爱了
还可以直接搞一张映射表,存 uid | time | source_index, 第一次直接查对应的 time 选出前100, 第二次直接用 CompleteFuture 去分别用 source_in
干得漂亮,多个朋友堵条路
2021.2.2版本的不适用吧
现在还可以用么
激活码有用,感谢分享