Netty ChannelFuture 异步监听
1. 前言
本节主要讲解 ChannelFuture ,它的作用是用来保存 Channel 异步操作的结果,可以看作是一个异步操作结果的占位符。
2. 概念
在 Netty 中所有的 IO 操作都是异步的,不能立刻得到 IO 操作的执行结果,但是可以通过注册一个监听器来监听其执行结果。在 Java 的并发编程当中可以通过 Future 来进行异步结果的监听,但是在 Netty 当中是通过 ChannelFuture 来实现异步结果的监听。通过注册一个监听的方式进行监听,当操作执行成功或者失败时监听会自动触发注册的监听事件。
3. 应用场景
ChannelFture 在开发当中经常需要用到,可以用来监听客户端连接服务端的结果反馈,Netty 是异步操作,无法知道什么时候执行完成,因此可以通过 ChannelFuture 来进行执行结果的监听。在 Netty 当中 Bind 、Write 、Connect 等操作会简单的返回一个 ChannelFuture。
4. 核心方法
序号 | 方法 | 描述 |
---|---|---|
1 | addListener | 注册监听器,当操作已完成 (isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器 |
2 | removeListener | 移除监听器 |
3 | sync | 等待异步操作执行完毕 |
4 | await | 等待异步操作执行完毕 |
5 | isDone | 判断当前操作是否完成 |
6 | isSuccess | 判断已完成的当前操作是否成功 |
7 | isCancellable | 判断已完成的当前操作是否被取消 |
8 | cause | 获取已完成的当前操作失败的原因 |
sync () 和 await () 都是等待异步操作执行完成,那么它们有什么区别呢?
- sync () 会抛出异常,建议使用 sync ();
- await () 不会抛出异常,主线程无法捕捉子线程执行抛出的异常。
5. 深入了解 ChannelFuture
5.1 生命周期说明
Future 可以通过四个核心方法来判断任务的执行情况。
状态 | 说明 |
---|---|
isDone() | 任务是否执行完成,无论成功还是失败 |
isSuccess() | 任务是否执行采购 |
isCancelled() | 任务是否被取消 |
cause() | 获取执行异常信息 |
执行过程状态的改变说明
当一个异步任务操作开始的时候,一个新的 future 对象就会被创建。在开始的时候该 future 是处于未完成的状态,也就是说,isDone ()=false、isSuccess ()=false、isCancelled ()=false;只要该任务中任何一种状态结束了,无论是说成功、失败、或者被取消,那么整个 Future 就会被标记为已完成。注意的是,如果执行失败那么 cause () 方法会返回异常信息的内容。
实例:
ChannelFuture channelFuture=bootstrap.connect("127.0.0.1",80);
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isDone()){
if(future.isSuccess()){
System.out.println("执行成功...");
}else if(future.isCancelled()){
System.out.println("任务被取消...");
}else if(future.cause()!=null){
System.out.println("执行出错:"+future.cause().getMessage());
}
}
}
});
5.2 ChannelFuture 父接口说明
ChannelFuture 的类继承结构,具体如下所示:
public interface ChannelFuture extends Future<Void> {
}
public interface Future<V> extends java.util.concurrent.Future<V> {
}
通过上面的继承关系,我们可以清晰的知道 ChannelFuture 其实最顶层的接口是来自 java 并发包的 Future,java 并发包下的 Future 需要手工检查执行结果是否已经完成,非常的繁琐,因此 Netty 把它进行了封装和完善,变成了自动的监听,用起来变的非常的简单。
java 并发包下的 Future 主要存在以下几个缺陷:
- 只允许手动通过 get () 来检查对应的操作是否已经完成,它是堵塞直到子线程完成执行并且返回结果;
- 只有 isDone () 方法判断一个异步操作是否完成,但是对于完成的定义过于模糊,JDK 文档指出正常终止、抛出异常、用户取消都会使 isDone () 方法返回真。并不能很好的区分到底是哪种状态。
get () 方法是堵塞的,必须等待子线程执行完成才能往下执行。
实例:
//1.定义一个子线程,实现 Callable 接口
public class ThreadTest implements Callable<Integer>{
@Override
public Integer call(){
//打印
System.out.println(">>>>>>>>子线程休眠之前");
//休眠5秒
Thread.sleep(5000);
//打印
System.out.println(">>>>>>>>子线程休眠之后");
return 1;
}
}
//2.调用子线程处理
public static void main(String[] args){
ThreadTest t=new ThreadTest();
FutureTask<Integer> future=new FutureTask<Integer>(t);
//2.1.开始执行子线程
new Thread(future).start();
//2.2.手工返回结果
int result=future.get();
System.out.println(">>>>>>>>执行结果:"+result);
//2.3.操作数据库
userDao.updateStatus("1");
}
执行结果:
>>>>>>>>子线程休眠之前
>>>>>>>>子线程休眠之后
>>>>>>>>执行结果:1
结论总结:
- 说明了 Java 并发包的 Future 要想获取异步执行结果,必须手工调用 get () 方法,此时虽然能获取执行结果,但是无法知道执行结果是成功还是失败;
- 使用 get () 获取执行结果,但是 get () 后面的业务则被堵塞,直到后面执行完毕才会往下执行,失去了异步操作提高执行效率的意义了。
6. ChannelFuture 原理
6.1 线程堵塞
思考:sync () 和 await () 方法如何同步等待执行完成并获取执行结果的呢?
源码分析如下所示:
private short waiters;//计数器
@Override
public Promise<V> await() throws InterruptedException {
//1.判断是否执行完成,如果执行完成则返回
if (isDone()) {
return this;
}
//2.线程是否已经中断,如果中断则抛异常
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
//3.检查死锁
checkDeadLock();
//4.同步代码块->while循环不断的监听执行结果
synchronized (this) {
while (!isDone()) {
incWaiters();//waiters递增
try {
wait();//JDK 的 Object 方法,线程等待【核心】
} finally {
decWaiters();//waiters 递减
}
}
}
return this;
}
//递增函数
private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}
//递减函数
private void decWaiters() {
--waiters;
}
通过以上代码,我们发现 await () 的核心其实就是调用 Object 的 wait () 方法进行线程休眠,普通的 Java 多线程知识点。
6.2 线程唤醒
思考:当前线程休眠了,那么什么时候进行唤醒呢?
源码分析如下所示:
@Override
public Promise<V> setSuccess(V result) {
//1.setSuccess0 赋值操作
if (setSuccess0(result)) {
//2.通知执行监听器
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
//继续进入方法
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
//继续进入方法
checkNotifyWaiters();
return true;
}
return false;
}
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
//核心:唤醒之前休眠的线程
notifyAll();
}
}
源码分析总结:
- 堵塞的核心是通过 Object.wait () 方法进行休眠当前线程,普通的 Java 多线程知识;
- 执行完成之后给不同状态(setSuccess、setFailure)赋值的时候唤醒休眠的线程;
- 唤醒线程之后调用监听器的方法
l.operationComplete(future);
7. 小结
通过本节的学习,我们需要掌握以下几个核心知识点:
- 掌握异步的概念,传统 I/O 是同步堵塞的,执行 I/O 操作后线程会被阻塞住,直到操作完成;异步处理的好处是不会造成线程阻塞,可以通过 Future 来监听异步执行的结果;
- ChannelFuture 的几种状态,以及它的值变化时机;
- ChannelFuture 的堵塞和唤醒源码分析。
最新评论
Spring Cloud Alibaba 微服务架构实战 https://pan.baidu.com/s/1jF5voFRoeF0lYAzAPBWSbw?pwd=chqk
命令: nload
真是个良心站点哇,大公无私,爱了爱了
还可以直接搞一张映射表,存 uid | time | source_index, 第一次直接查对应的 time 选出前100, 第二次直接用 CompleteFuture 去分别用 source_in
干得漂亮,多个朋友堵条路
2021.2.2版本的不适用吧
现在还可以用么
激活码有用,感谢分享