Future以及FutureTask

mac2024-04-19  37

Future介绍

在学习Future之前,需要先了解传统模式与future模式的区别,我觉得这个很重要。但是确实又不好说明,另起一篇文章。

https://blog.csdn.net/java_lifeng/article/details/102844190

future模式是一种并发设计模式,Future接口是JDK对该模式提供的一种实现,两者并无必然关系。


1. Future 表示异步的结果计算,Future 接口主要定义了5个方法: 

1) boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。 

2)boolean isCancelled():如果在任务正常完成前将其取消,则返回 true。 

3)boolean isDone():如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。 

4)V get() throws InterruptedException,ExecutionException:如有必要,等待计算完成,然后获取其结果。 

5)V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException:尝试获取其结果,如有必要,最多等待为使计算完成所给定的时间timeout,如果超时仍未完成计算,抛出超时异常

InterruptedException 线程中断异常

ExecutionException 任务执行异常,比如中途被取消

TimeoutException 超时异常


2. Future的类图结构

RunnableFuture:这个接口同时继承Future接口和Runnable接口,在成功执行run()方法后,可以通过Future访问执行结果。这个接口的实现类是FutureTask,一个可取消的异步任务,这个类提供了Future的基本实现,后面我们的demo也是用这个类实现,它实现了启动和取消一个任务,查询这个任务是否已完成,获取任务结果。任务的结果只能在任务已经完成的情况下获取。如果任务没有完成,get方法会阻塞,一旦任务完成,这个任务将不能被重启和取消,除非调用runAndReset方法。

FutureTask:能用来包装一个CallableRunnable对象,因为它实现了RunnableFuture接口,而且它能被传递到Executor进行执行。

SchedualFuture:这个接口表示一个可以延时任务的结果。

其他如 CompleteFuture,ForkJoinTask 了解的不多,用的也比较少,暂时就不做记录。


3. FutureTask 源码分析

// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package java.util.concurrent; import java.util.concurrent.locks.LockSupport; import sun.misc.Unsafe; public class FutureTask<V> implements RunnableFuture<V> { // 任务状态值,volatile修饰 private volatile int state; //初始创建时的状态 private static final int NEW = 0; //当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之前会将FutureTask的状态修改为COMPLETING。 private static final int COMPLETING = 1; //当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之后会将FutureTask的状态修改为NORMAL。 private static final int NORMAL = 2; //当任务在执行的过程中抛了异常,FutureTask会将异常信息设置给outcome属性,在设置之前会将FutureTask的状态修改为COMPLETING,在设置之后将状态修改为EXCEPTIONAL。 private static final int EXCEPTIONAL = 3; //当外部想要取消任务,而又不允许当任务正在执行的时候被取消时会将FutureTask的状态修改为CANCELLED。 private static final int CANCELLED = 4; //当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,然后设置执行任务的线程的中断标记位。 private static final int INTERRUPTING = 5; //当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,然后设置执行任务的线程的中断标记位,最后将Future的状态设置为INTERRUPTED。 private static final int INTERRUPTED = 6; /** * 综上,FutureTask的状态转换流转可能为: * NEW—>COMPLETING—>NORMAL(任务执行正常) * NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常) * NEW—>CANCELLED(任务未得到执行直接取消,或者任务正在执行过程中被取消,但是不允许中断cancel(false)) * NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消) * FutureTask中使用CAS操作更新state来表示任务完成,极大地降低了使用加锁进行同步控制的性能开销。 */ /** 底层的调用,运行后为空 */ private Callable<V> callable; /** get()返回的结果或抛出的异常,非volatile,受state读/写保护 */ private Object outcome; /** 底层执行callable任务的线程,在run()方法中使用CAS操作赋值 */ private volatile Thread runner; /** 等待线程的Treiber(传动齿轮)堆栈 */ private volatile FutureTask.WaitNode waiters; private static final Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = Unsafe.getUnsafe(); Class var0 = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("waiters")); } catch (Exception var1) { throw new Error(var1); } } /** * 返回已完成的任务的结果或抛出的异常,参数var1--任务完成后的状态值state */ private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } public FutureTask(Callable<V> var1) { if (var1 == null) { throw new NullPointerException(); } else { this.callable = var1; this.state = NEW; } } public FutureTask(Runnable var1, V var2) { // 此处可参考Executors源码 this.callable = Executors.callable(var1, var2); this.state = NEW; } public boolean isCancelled() { return this.state >= CANCELLED; } public boolean isDone() { return this.state != NEW; } public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } /** * 什么也不做,该方法主要用于子类个性化定制,在任务完成后执行自定义操作 * 如ExecutorCompletionService中QueueingFuture实现FutureTask,实现done()以达到任务完成自动将Future加入结果队列 */ protected void done() { } protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { //流程:NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常) if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } public void run() { // compareAndSwapObject这个方法此处作用是仅当runner为null时对其赋值为当前线程对象,也 // 就是说仅当任务状态为NEW,且无线程用于运行此任务时,开始执行任务。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) // 修改状态值 set(result); } } finally { // state被设置前必须保证runner非空,以阻止run()被并发调用。进入finally 代码块,则state已经被设置,任务已经执行完毕,此时可以安全的将runner置为null runner = null; // runner置为null后,必须重新读取state以防止任务在执行中被取消,有中断发生,以进行相应的逻辑操作 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * 被设计用于本质上可执行多次的任务。也即如果当前执行正常完成,则不调用set方法修改state,而 * 是保持初始状态NEW,可参考子类实现ScheduledFutureTask */ protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; } private void handlePossibleCancellationInterrupt(int var1) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); } /** * 用于唤醒等待队列中的所有后续线程(若有)。当任务未完成时,调用get()方法会被加入等待队列并阻塞。 * waiters的初始化是在调用get系列方法后,发现任务尚未完成,则会阻塞调用线程,并将其加入等待队列 */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // 注意,compareAndSwapObject方法是将waiters字段重新赋值为null,局部变量q的指向并没有变,指向原先的WaitNode if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //成功则进入自旋,唤醒堆栈中的后续所有线程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } /** * 任务完成或者超时则返回对应状态值,线程中断则抛异常 * timed是否设置超时设置,nacos超时设定等待时间 * * FutureTask中WaitNode 作为节点,并将当前线程保存在其中,而且始终将栈顶元素保存在waiters中 */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //判断当前线程是否已中断,该方法会清除线程状态,也就是说第一次调用返回true, //并且没有再次被中断时,第二次调用将返回false if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 任务已完成或被取消 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 表示任务马上完成,不必进入等待队列 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 此时s只可能为NEW else if (q == null) q = new WaitNode(); else if (!queued) // 使用CAS操作加入当前等待节点q,通过将q设为新的栈顶元素,即waiters,同时修改q.next指针指向上一次的waiters。这里使用自旋操作来保证操作一定成功 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } /** *仅当线程中断,或者get方法超时时,移除对应节点 *removeWaiter操作的作用在于移除无效节点,避免造成垃圾累积,当堆栈中节点较多,removeWaiter *操作会很慢。通常情况下,不会有太多线程同时等待一个任务的结果。具体逻辑这里不分析了,有点麻烦,不过多关注 */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } } static final class WaitNode { volatile Thread thread = Thread.currentThread(); volatile FutureTask.WaitNode next; WaitNode() { } } }

future状态转换图

对于cancel方法,着重再分析一下源码(以下为1.7源码,1.8源码略微变动了下结构,但逻辑没变)

//有一个入参,需要说明task是否是可中断的 public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { //尝试修改futuretask的运行状态 if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; //获取当前线程发出中断信号 Thread t = runner; if (t != null) t.interrupt(); //修改task装态为已中断的 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //如果是不可中断的只修改task的状态为cancelled else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; //执行finishCompletion方法 /** * Removes and signals all waiting threads, invokes done(),and nulls out callable. */ finishCompletion(); return true; }

 

可以看到cancel方法主要做了两件事,1. 修改自身状态值 2. 中断任务执行线程(mayInterruptIfRunning参数为true时)

取消操作只是改变了任务对象的状态并可能会中断执行线程(这里的中断只是给线程打上一个中断标记,关于线程中断的概念可查阅相关文章)。如果任务的逻辑代码没有响应中断,则会一直异步执行直到完成,只是最终的执行结果不会被通过get方法返回,计算资源的开销仍然是存在的。

对于尚未启动的任务,调用cancle后将永远无法被执行。


最新回复(0)