Netty ChannelPipeline 数据管道
1. 前言
前面,我们也提到了 ChannelPipeline,它是管道或者说管理 Handler 的集合,很多同学很容易搞混 Channel、ChannelPipeline 和 ChannelHandler 之间关系。
本节内容我们需要理清并且掌握以下知识点:
- Channel、ChannelPipeline、ChannelHandler 之间的关系;
- 了解 ChannelPipeline 如何管理 ChannelHandler。
2. 三者之间的关系
Channel 是一个连接通道,客户端和服务端连接成功之后,会维持一个 Channel,可以通过 Channel 来发送数据。Channel 有且仅有一个 ChannelPipeline 与之相对应,ChannelPipeline 又维护着一个由多个 ChannelHandlerContext 组成的双向链表,ChannelHandlerContext 又关联着一个 ChannelHandler。
它们之间的关系,大概如下图所示:
3. ChannelPipeline 核心方法
ChannelPipeline 的最常用方法:
方法 | 描述 |
---|---|
addFirst(…) | 添加 ChannelHandler 在 ChannelPipeline 的第一个位置 |
addBefore(…) | 在 ChannelPipeline 中指定的 ChannelHandler 名称之前添加 ChannelHandler |
addAfter(…) | 在 ChannelPipeline 中指定的 ChannelHandler 名称之后添加 ChannelHandler |
addLast(…) | 在 ChannelPipeline 的末尾添加 ChannelHandler |
remove(…) | 删除 ChannelPipeline 中指定的 ChannelHandler |
replace(…) | 替换 ChannelPipeline 中指定的 ChannelHandler |
ChannelHandler first() | 获取链表当中的第一个节点 |
ChannelHandler last() | 获取链表当中的最后一个节点 |
实例:
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new Handler1());
ch.pipeline().addLast(new Handler2());
ch.pipeline().addLast(new Handler3());
ch.pipeline().addLast(new Handler4());
}
});
总结,ChannelPipeline 的用法比较固定,虽然方法很多,但是一般常用的就是 addLast。
4. ChannelPipeline 管理链表
实例:
//1.创建ChannelPipeline
ChannelPipeline pipeline = ch.pipeline();
//2.创建Handler
FirstHandler firstHandler = new FirstHandler();
SecondHandler secondHandler=new SecondHandler();
ThirdHandler thirdHandler=new ThirdHandler();
FourthHandler fourthHandler=new FourthHandler();
//3.操作
pipeline.addLast("handler1", firstHandler);
pipeline.addFirst("handler2", secondHandler);//在最开始添加
pipeline.addLast("handler3", thirdHandler);//在最后添加
pipeline.remove("handler3"); //根据名称删除
pipeline.remove(firstHandler);//根据对象删除
pipeline.replace("handler2", "handler4", fourthHandler);//替换
输出结果:
FourthHandler
5. 入站和出站执行顺序
在真实的项目开发当中,inboundHandler 和 outboundHandler 都是多个的,一般是一个业务处理对应一个 Handler。那么多个的情况下,Pipeline 的执行顺序又是怎么样的呢?
5.1 Inbound 不往下传递
实例:
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound1>>>>>>>>>");
}
}
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
}
}
执行结果:
inbound1>>>>>>>>>
思考:为什么不执行 InboundHandler2 呢?
原因:InboundHandler1 没有手工往下传递执行。
5.2 Inbound 流转顺序
实例:
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound1>>>>>>>>>");
//往下传递
super.channelRead(ctx, msg);
}
}
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
}
}
执行结果:
inbound1>>>>>>>>>
inbound2>>>>>>>>>
InboundHandler 之间可以通过 super.channelRead(ctx, msg);
往下传递。
5.3 Inbound 执行顺序
实例:
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1.往下传递
super.channelRead(ctx, msg);
//2.打印信息
System.out.println("inbound1>>>>>>>>>");
}
}
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
}
}
执行结果:
inbound2>>>>>>>>>
inbound1>>>>>>>>>
InboundHandler1 先往下传递,在执行自己的业务,那么 InboundHandler2 就会比 InboundHandler1 先执行。
总结:Inbound 是按顺序进行传递,但是逻辑的执行并非是按顺序执行,而是由
super.channelRead(ctx, msg);
去决定。
5.4 流转到 Outbound
InboundHandler 往 OutboundHandler 流转,需要手工调用 ctx.channel().writeAndFlush()
,否则无法执行 OutboundHandler 的业务逻辑。
实例:
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
//传递到OutboundHandler
ctx.channel().writeAndFlush("hello world");
}
}
5.5 Outbound 内部流转
跟 InboundHandler 一样,需要手工往下传递,否则无法流转到下一个 OutboundHandler。
实例:
public class OutboundHandler2 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("outbound2>>>>>>>>>");
//往下流转
super.write(ctx, msg, promise);
}
}
总结:OutboundHandler 是按逆向来流转,但是业务逻辑的执行顺序则是由
super.write(ctx, msg, promise);
决定。
5.6 ctx.writeAndFlush 和 ctx.channel ().writeAndFlush 的区别
很多同学很容易遇到以下问题,并且会想不通。
实例:
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
InboundHandler2 流转代码
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
//流转到OutboundHandler2
ctx.channel().writeAndFlush("hello world");
}
}
执行结果:
inbound1>>>>>>>>>
inbound2>>>>>>>>>
outbound2>>>>>>>>>
outbound1>>>>>>>>>
修改 InboundHandler2 流转代码
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
//【注意,调整这里了】
ctx.writeAndFlush("hello world");
}
}
执行结果
inbound1>>>>>>>>>
inbound2>>>>>>>>>
思考:为什么这里使用 ctx.writeAndFlush 就流程不下去了呢?
ctx.writeAndFlush();
最终源码
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while(!ctx.outbound);
return ctx;
}
通过源码,我们发现它是从当前 InboundHandler 开始往前执行。
ctx.channel().writeAndFlush();
最终源码
public final ChannelFuture writeAndFlush(Object msg) {
return this.tail.writeAndFlush(msg);
}
通过源码,我们发现它是从链表的最后一个节点开始往前面执行。
总结,如果是 OutboundHandler 放在 InboundHandler 之后,使用不同的 writeAndFlush 则得到的结果不一样。
5.7 规律总结
Inbound 的顺序
- 流转顺序: 多个 Inbound 不会自动往下流转,需要手工调用
ctx.fireChannelRead(msg);
才能流转到下一个; - 执行顺序: 业务逻辑的执行顺序,则根据
ctx.fireChannelRead(msg);
和逻辑的先后顺序所决定; - Inbound 往 Outbound 流转,则需要手工
ctx.channel().writeAndFlush()
。
Outbound 的顺序
- 流转顺序: 多个 Outbound 不会自动往下流转,需要手工调用
ctx.write(msg, promise);
才能流转到下一个; - 执行顺序: 业务逻辑的执行顺序,则根据
ctx.write(msg, promise);
和逻辑的先后顺序所决定。
6. 小结
本文主要讲解的知识点
- Channel、ChannelPipeline、ChannelHandlerContext、ChannelHandler 之间的关系;
- ChannelPipeline 的核心方法,以及它是如何管理 Handler 的,主要通过 addLast () 去组装 Handler;
- 入站和出站的流转顺序和业务逻辑的执行顺序。
最新评论
命令: nload
真是个良心站点哇,大公无私,爱了爱了
还可以直接搞一张映射表,存 uid | time | source_index, 第一次直接查对应的 time 选出前100, 第二次直接用 CompleteFuture 去分别用 source_in
干得漂亮,多个朋友堵条路
2021.2.2版本的不适用吧
现在还可以用么
激活码有用,感谢分享
激活码的地址打不开了