Future介绍
在学习Future之前,需要先了解传统模式与future模式的区别,我觉得这个很重要。但是确实又不好说明,另起一篇文章。
https://blog.csdn.net/java_lifeng/article/details/102844190
future模式是一种并发设计模式,Future接口是JDK对该模式提供的一种实现,两者并无必然关系。
1. Future 表示异步的结果计算,Future 接口主要定义了5个方法:
1) boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。
2)boolean isCancelled():如果在任务正常完成前将其取消,则返回 true。
3)boolean isDone():如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。
4)V get() throws InterruptedException,ExecutionException:如有必要,等待计算完成,然后获取其结果。
5)V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException:尝试获取其结果,如有必要,最多等待为使计算完成所给定的时间timeout,如果超时仍未完成计算,抛出超时异常。
InterruptedException 线程中断异常
ExecutionException 任务执行异常,比如中途被取消
TimeoutException 超时异常
2. Future的类图结构
RunnableFuture:这个接口同时继承Future接口和Runnable接口,在成功执行run()方法后,可以通过Future访问执行结果。这个接口的实现类是FutureTask,一个可取消的异步任务,这个类提供了Future的基本实现,后面我们的demo也是用这个类实现,它实现了启动和取消一个任务,查询这个任务是否已完成,获取任务结果。任务的结果只能在任务已经完成的情况下获取。如果任务没有完成,get方法会阻塞,一旦任务完成,这个任务将不能被重启和取消,除非调用runAndReset方法。
FutureTask:能用来包装一个Callable或Runnable对象,因为它实现了RunnableFuture接口,而且它能被传递到Executor进行执行。
SchedualFuture:这个接口表示一个可以延时任务的结果。
其他如 CompleteFuture,ForkJoinTask 了解的不多,用的也比较少,暂时就不做记录。
3. FutureTask 源码分析
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;
import sun.misc.Unsafe;
public class FutureTask<V> implements RunnableFuture<V> {
// 任务状态值,volatile修饰
private volatile int state;
//初始创建时的状态
private static final int NEW = 0;
//当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之前会将FutureTask的状态修改为COMPLETING。
private static final int COMPLETING = 1;
//当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之后会将FutureTask的状态修改为NORMAL。
private static final int NORMAL = 2;
//当任务在执行的过程中抛了异常,FutureTask会将异常信息设置给outcome属性,在设置之前会将FutureTask的状态修改为COMPLETING,在设置之后将状态修改为EXCEPTIONAL。
private static final int EXCEPTIONAL = 3;
//当外部想要取消任务,而又不允许当任务正在执行的时候被取消时会将FutureTask的状态修改为CANCELLED。
private static final int CANCELLED = 4;
//当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,然后设置执行任务的线程的中断标记位。
private static final int INTERRUPTING = 5;
//当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,然后设置执行任务的线程的中断标记位,最后将Future的状态设置为INTERRUPTED。
private static final int INTERRUPTED = 6;
/**
* 综上,FutureTask的状态转换流转可能为:
* NEW—>COMPLETING—>NORMAL(任务执行正常)
* NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)
* NEW—>CANCELLED(任务未得到执行直接取消,或者任务正在执行过程中被取消,但是不允许中断cancel(false))
* NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)
* FutureTask中使用CAS操作更新state来表示任务完成,极大地降低了使用加锁进行同步控制的性能开销。
*/
/** 底层的调用,运行后为空 */
private Callable<V> callable;
/** get()返回的结果或抛出的异常,非volatile,受state读/写保护 */
private Object outcome;
/** 底层执行callable任务的线程,在run()方法中使用CAS操作赋值 */
private volatile Thread runner;
/** 等待线程的Treiber(传动齿轮)堆栈 */
private volatile FutureTask.WaitNode waiters;
private static final Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = Unsafe.getUnsafe();
Class var0 = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("waiters"));
} catch (Exception var1) {
throw new Error(var1);
}
}
/**
* 返回已完成的任务的结果或抛出的异常,参数var1--任务完成后的状态值state
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
public FutureTask(Callable<V> var1) {
if (var1 == null) {
throw new NullPointerException();
} else {
this.callable = var1;
this.state = NEW;
}
}
public FutureTask(Runnable var1, V var2) {
// 此处可参考Executors源码
this.callable = Executors.callable(var1, var2);
this.state = NEW;
}
public boolean isCancelled() {
return this.state >= CANCELLED;
}
public boolean isDone() {
return this.state != NEW;
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
/**
* 什么也不做,该方法主要用于子类个性化定制,在任务完成后执行自定义操作
* 如ExecutorCompletionService中QueueingFuture实现FutureTask,实现done()以达到任务完成自动将Future加入结果队列
*/
protected void done() {
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
//流程:NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
public void run() {
// compareAndSwapObject这个方法此处作用是仅当runner为null时对其赋值为当前线程对象,也
// 就是说仅当任务状态为NEW,且无线程用于运行此任务时,开始执行任务。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// 修改状态值
set(result);
}
} finally {
// state被设置前必须保证runner非空,以阻止run()被并发调用。进入finally 代码块,则state已经被设置,任务已经执行完毕,此时可以安全的将runner置为null
runner = null;
// runner置为null后,必须重新读取state以防止任务在执行中被取消,有中断发生,以进行相应的逻辑操作
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* 被设计用于本质上可执行多次的任务。也即如果当前执行正常完成,则不调用set方法修改state,而
* 是保持初始状态NEW,可参考子类实现ScheduledFutureTask
*/
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
private void handlePossibleCancellationInterrupt(int var1) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}
/**
* 用于唤醒等待队列中的所有后续线程(若有)。当任务未完成时,调用get()方法会被加入等待队列并阻塞。
* waiters的初始化是在调用get系列方法后,发现任务尚未完成,则会阻塞调用线程,并将其加入等待队列
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 注意,compareAndSwapObject方法是将waiters字段重新赋值为null,局部变量q的指向并没有变,指向原先的WaitNode
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//成功则进入自旋,唤醒堆栈中的后续所有线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
/**
* 任务完成或者超时则返回对应状态值,线程中断则抛异常
* timed是否设置超时设置,nacos超时设定等待时间
*
* FutureTask中WaitNode 作为节点,并将当前线程保存在其中,而且始终将栈顶元素保存在waiters中
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//判断当前线程是否已中断,该方法会清除线程状态,也就是说第一次调用返回true,
//并且没有再次被中断时,第二次调用将返回false
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 任务已完成或被取消
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 表示任务马上完成,不必进入等待队列
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 此时s只可能为NEW
else if (q == null)
q = new WaitNode();
else if (!queued)
// 使用CAS操作加入当前等待节点q,通过将q设为新的栈顶元素,即waiters,同时修改q.next指针指向上一次的waiters。这里使用自旋操作来保证操作一定成功
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
/**
*仅当线程中断,或者get方法超时时,移除对应节点
*removeWaiter操作的作用在于移除无效节点,避免造成垃圾累积,当堆栈中节点较多,removeWaiter
*操作会很慢。通常情况下,不会有太多线程同时等待一个任务的结果。具体逻辑这里不分析了,有点麻烦,不过多关注
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
static final class WaitNode {
volatile Thread thread = Thread.currentThread();
volatile FutureTask.WaitNode next;
WaitNode() {
}
}
}
future状态转换图
对于cancel方法,着重再分析一下源码(以下为1.7源码,1.8源码略微变动了下结构,但逻辑没变)
//有一个入参,需要说明task是否是可中断的
public boolean cancel(boolean mayInterruptIfRunning) {
if (state != NEW)
return false;
if (mayInterruptIfRunning) {
//尝试修改futuretask的运行状态
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
//获取当前线程发出中断信号
Thread t = runner;
if (t != null)
t.interrupt();
//修改task装态为已中断的
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
//如果是不可中断的只修改task的状态为cancelled
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
//执行finishCompletion方法
/**
* Removes and signals all waiting threads, invokes done(),and nulls out callable.
*/
finishCompletion();
return true;
}
可以看到cancel方法主要做了两件事,1. 修改自身状态值 2. 中断任务执行线程(mayInterruptIfRunning参数为true时)
取消操作只是改变了任务对象的状态并可能会中断执行线程(这里的中断只是给线程打上一个中断标记,关于线程中断的概念可查阅相关文章)。如果任务的逻辑代码没有响应中断,则会一直异步执行直到完成,只是最终的执行结果不会被通过get方法返回,计算资源的开销仍然是存在的。
对于尚未启动的任务,调用cancle后将永远无法被执行。