随着微服务架构的兴起,跨设备调用越来越频繁,一个业务流程,可能调用N次第三方接口,获取N种上游数据。因此,如何高效率地异步去调取这些接口,然后同步的去处理这些接口返回的结果?这是高并发要解决的一个基础问题。
在Netty中,大量使用了异步回调模式。
A线程调用线程B的B.join方法,合并B线程。那么,线程A进入阻塞状态,知道B线程执行完成。
join方法的三个重载版本:
void join():A线程等待B线程执行结束之后,A线程重新恢复执行。void join(long millis):最长等待时间,超过后。不论B线程是否结束,A线程重新恢复执行。void join(long mills,int nanos):最长等待时间-mills+nanos, 不论B线程是否结束,A线程重新恢复执行。
Callable接口:
Runnable接口是在java多线程中表示线程的业务代码的抽象接口。但是,它的run()是没有返回值的。 java定义了类似接口--Callable接口。代表业务处理的方法是call()。 区别:Callable接口不能作为Tread的target目标。 搭桥:FutrueTask使Callable可以作为Thread的target目标。是对Callable的二次封装。FutrueTask:
搭桥,FutrueTask类能当做Thread的target目标。获取,FutrueTask可以获取异步执行结果。抽象,在java语言中,将FutrueTask的一系列操作抽象出来---Futrue接口。Futrue接口:
判断:判断并发任务是否执行完成。获取:获取并发的任务完成后的结果。取消:取消并发执行中的任务。 public interface Future<V> { //取消并发任务的执行。 boolean cancel(booleanmayInterruptRunning); booleanisCancelled(); //获取并发任务的执行状态。如果任务执行结束,则返回true。 booleanisCancelled():获取并发任务的取消状态。如果任务完成前被取消,则返回true。 booleanisDone(); //获取并发任务执行的结果。注意,这个方法是阻塞性的。如果并发任务没有执行完成,调用此方法的线程会一直阻塞,直到并发任务执行完成。 V get() throws InterruptedException, ExecutionException; //获取并发任务执行的结果。也是阻塞性的,但是会有阻塞的时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。 V get(long timeout, TimeUnitunit) throws InterruptedException, ExecutionException, TimeoutException; }
FutrueTask深入:
private Callable callable;private Object outcome;run()本FutrueTask案例缺点:虽然用了Futrue模式,但是只是能获取异步结果,依然需要等待,依然是阻塞的。
public class JavaFutureDemo { public static final int SLEEP_GAP = 500; public static String getCurThreadName() { return Thread.currentThread().getName(); } //第一步,实现了Callable,替代Runnable,实现线程业务-烧水。 static class HotWarterJob implements Callable<Boolean> //① { @Override public Boolean call() throws Exception //② { try { Logger.info("洗好水壶"); Logger.info("灌上凉水"); Logger.info("放在火上"); //线程睡眠一段时间,代表烧水中 Thread.sleep(SLEEP_GAP); Logger.info("水开了"); } catch (InterruptedException e) { Logger.info(" 发生异常被中断."); return false; } Logger.info(" 运行结束."); return true; } } //第二步,实现了Callable,替代Runnable,实现线程业务-清洗。 static class WashJob implements Callable<Boolean> { @Override public Boolean call() throws Exception { try { Logger.info("洗茶壶"); Logger.info("洗茶杯"); Logger.info("拿茶叶"); //线程睡眠一段时间,代表清洗中 Thread.sleep(SLEEP_GAP); Logger.info("洗完了"); } catch (InterruptedException e) { Logger.info(" 清洗工作发生异常被中断."); return false; } Logger.info(" 清洗工作运行结束."); return true; } } public static void drinkTea(booleanwarterOk, booleancupOk) { if (warterOk&&cupOk) { Logger.info("泡茶喝"); } else if (! warterOk) { Logger.info("烧水失败,没有茶喝了"); } else if (! cupOk) { Logger.info("杯子洗不了,没有茶喝了"); } } public static void main(String args[]) { //第三步:将callable封装成FutureTask Callable<Boolean>hJob = new HotWarterJob(); //③ //第四步:将FutureTask装入线程 FutureTask<Boolean>hTask = new FutureTask<>(hJob); //④ Thread hThread = new Thread(hTask, "** 烧水-Thread"); //⑤ Callable<Boolean>wJob = new WashJob(); //③ FutureTask<Boolean>wTask = new FutureTask<>(wJob); //④ Thread wThread = new Thread(wTask, "$$ 清洗-Thread"); //⑤ //第五步:启动线程 hThread.start(); wThread.start(); Thread.currentThread().setName("主线程"); try { //第六步:获取线程结果 boolean warterOk = hTask.get(); boolean cupOk = wTask.get(); //第七步:线程结果处理 drinkTea(warterOk, cupOk); } catch (InterruptedException e) { Logger.info(getCurThreadName() + "发生异常被中断."); } catch (ExecutionException e) { e.printStackTrace(); } Logger.info(getCurThreadName() + " 运行结束."); } }谷歌高并发包,对java的异步回调机制做了增强:
引入新接口ListenableFutrue,继承了java的Futrue接口,使得Java的Futrue异步任务,在Guava中能被监控和获得结果。 public interface ListenableFuture<V> extends Future<V> { //此方法由Guava内部调用 //将FutureCallback回调工作,封装成一个内部的runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback。 //白话说,FutureCallback中的success和fail在这里封装成runnable送入线程池被执行。 void addListener(Runnable r, Executor e); }引入新接口FutrueCallback,独立新接口,目的是在异步任务执行完成之后,根据异步结果,完成不同的回调处理。 public interface FutureCallback<V> { void onSuccess(@Nullable V var1); void onFailure(Throwable var1); }
简单实例
Futures.addCallback(listenableFuture, newFutureCallback<Boolean>() { public void onSuccess(Boolean r) { // listenableFuture内部的Callable成功时回调此方法 } public void onFailure(Throwable t) { // listenableFuture内部的Callable异常时回调此方法 } });
异步回调案例
Guava案例
//…. public class GuavaFutureDemo { public static final int SLEEP_GAP = 500; public static String getCurThreadName() { return Thread.currentThread().getName(); } //业务逻辑:烧水 static class HotWarterJob implements Callable<Boolean> { @Override public Boolean call() throws Exception { //……省略,与使用FutureTask实现异步泡茶喝相同 } } //业务逻辑:清洗 static class WashJob implements Callable<Boolean> { @Override public Boolean call() throws Exception { //……省略,与使用FutureTask实现异步泡茶喝相同 } } //新创建一个异步业务类型,作为泡茶喝主线程类 static class MainJob implements Runnable { booleanwarterOk = false; booleancupOk = false; int gap = SLEEP_GAP / 10; @Override public void run() { while (true) { try { Thread.sleep(gap); Logger.info("读书中......"); } catch (InterruptedException e) { Logger.info(getCurThreadName() + "发生异常被中断."); } if (warterOk&&cupOk) { drinkTea(warterOk, cupOk); } } } public void drinkTea(Boolean wOk, Boolean cOK) { if (wOk&&cOK) { Logger.info("泡茶喝,茶喝完"); this.warterOk = false; this.gap = SLEEP_GAP * 100; } else if (! wOk) { Logger.info("烧水失败,没有茶喝了"); } else if (! cOK) { Logger.info("杯子洗不了,没有茶喝了"); } } } public static void main(String args[]) { //第一步:启动主线程。 //创建一个新的线程实例,作为泡茶主线程 MainJobmainJob = new MainJob(); Thread mainThread = new Thread(mainJob); mainThread.setName("主线程"); mainThread.start(); //烧水的业务逻辑实例 Callable<Boolean>hotJob = new HotWarterJob(); //清洗的业务逻辑实例 Callable<Boolean>washJob = new WashJob(); //第二步:创建子线程池 //创建Java线程池 ExecutorService jPool = Executors.newFixedThreadPool(10); //包装Java线程池,构造Guava线程池 ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool); //第三步:线程池中注入任务,并获取结果 //提交烧水的业务逻辑实例,到Guava线程池获取异步任务 ListenableFuture<Boolean> hotFuture = gPool.submit(hotJob); //第四步:设置回调。 //绑定异步回调,烧水完成后,把喝水任务的warterOk标志设置为true Futures.addCallback(hotFuture, new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { if (r) { mainJob.warterOk = true; } } public void onFailure(Throwable t) { Logger.info("烧水失败,没有茶喝了"); } }; //提交清洗的业务逻辑实例,到Guava线程池获取异步任务 ListenableFuture<Boolean> washFuture = gPool.submit(washJob); //绑定任务执行完成后的回调逻辑到异步任务 Futures.addCallback(washFuture, new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { if (r) { mainJob.cupOk = true; } } public void onFailure(Throwable t) { Logger.info("杯子洗不了,没有茶喝了"); } }); } }
netty和guava一样,实现了自己的异步回调机体系。netty集成和扩展了JDK Future系列异步回调的API,定义了自身的Future系列接口和类。实现了异步任务的监控、异步执行结果的获取。
继承java的Future接口(判断+结果+取消),得到了一个新的属于netty的Future异步任务接口。该接口对原有的接口进行了增强,使得netty异步任务能够以非阻塞的方式处理回调(发起回调)。netty没有修改Future的名称,只是调整了包名。引入一个新接口:GenericFutureListener,用于表示异步执行完成的监听器。不同于guava,netty使用了监听器的模式,异步任务对的执行完成后的回调逻辑抽象成Listener监听器接口。可以将netty的GenericFutureListener的监听器接口加入Future中,实现对异步任务执行状态的事件监听。netty与guava的模式类比:
Netty的Future接口,可以对应到Guava的ListenableFuture接口。(判断+结果+取消+发起回调) public interface Future<V> extendsjava.util.concurrent.Future<V> { booleanisSuccess(); // 判断异步执行是否成功 booleanisCancellable(); // 判断异步执行是否取消 Throwable cause(); //获取异步任务异常的原因 //增加异步任务执行完成与否的监听器Listener Future<V>addListener(GenericFutureListener<? extends Future<? super V>>listener); //移除异步任务执行完成与否的监听器Listener Future<V>removeListener(GenericFutureListener<? extends Future<? superV>> listener); //.... }一般netty有一系列的子类,实际使用。
在netty 网络连接中,网络连接通道的输入和输出处理都是异步的。都会返回一个ChannelFuture接口的实例。
//connect是异步的,仅提交异步任务 ChannelFuture future= bootstrap.connect(new InetSocketAddress("www.manning.com",80)); //connect的异步任务真正执行完成后,future回调监听器才会执行 future.addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(channelFuture.isSuccess()){ System.out.println("Connection established"); } else { System.err.println("Connection attempt failed"); channelFuture.cause().printStackTrace(); } } });Netty的GenericFutureListener接口,可以对应到Guava的FutureCallback接口。(回调) //GenericFutureListener的父接口EventListener是一个空接口,没有任何的抽象方法,是一个仅仅具有标识作用的接口。 public interface GenericFutureListener<F extends Future<? >> extends EventListener { //监听器的回调方法 void operationComplete(F var1) throws Exception; }