Future接口定义
package com.jiang.currents.future; /** * @Description 获取计算呢结果和判断任务是否完成两个接口 * @createTime 2019年10月31日 */ public interface Future<T> { // 返回计算后的结果,该方法会陷入阻塞状态 T get() throws InterruptedException ; // 判断任务是否已经被完成 boolean done(); }FutureService接口定义
package com.jiang.currents.future; /** * @ClassName FutureService.java * @Description FutureService主要用于提交任务 * @createTime 2019年10月31日 */ public interface FutureService<IN,OUT> { // 接受不需要返回值的任务,future.get()会获得null Future<?> submit(Runnable runnable); // 提交需要返回值的任务,Task接口替代了Runnable接口 Future<OUT> submit(Task<IN, OUT> task, IN input); // 提交需要返回值的任务 回调函数 Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback); // 使用静态方法构建一个FutureService static <T, R> FutureService<T, R> newService() { return new FutureServiceImpl<>(); } }Task接口设计
package com.jiang.currents.future; /** * @ClassName Task.java * @Description Task接口主要是体提供给调用者实现计算逻辑使用的 * @createTime 2019年10月31日 */ public interface Task<IN, OUT> { // 给定一个参数,经过计算返回结果 OUT get(IN input); }Callback回调接口
package com.jiang.currents.future; /** * @ClassName Callback.java * @Description TODO * @createTime 2019年10月31日 */ public interface Callback<T> { //任务完成后会调用该方法, 其中T为任务执行后的结果 void call(T t); }FutureTask是Future的一个实现,除了实现Future中定义的get()以及done()方法。还额外增加了protected方法finish方法
package com.jiang.currents.future; /** * @ClassName FutureTask.java * @Description TODO * @createTime 2019年10月31日 */ public class FutureTask<T> implements Future<T> { // 计算结果 private T result; // 是否已经完成 private boolean isDone = false; // 定义对象锁 private final Object LOCK = new Object(); @Override public T get() throws InterruptedException { synchronized (LOCK) { // 当任务还没有完成时,调用get方法被挂起,进入阻塞 while (!isDone) { LOCK.wait(); } } // 计算返回结果 return result; } //设置计算结果 protected void finish(T result) { synchronized (LOCK) { if (isDone) { return; } #计算完成,并将isDone 设置为true,同时唤醒阻塞中的线程 this.result = result; this.isDone = true; LOCK.notifyAll(); } } @Override public boolean done() { return isDone; } }FutureServiceImpl
package com.jiang.currents.future; import java.util.concurrent.atomic.AtomicInteger; /** * 当提交任务的时候,创建一个信息的线程来受理该任务,进而达到任务异步执行的效果 * * @param <IN> * @param <OUT> */ public class FutureServiceImpl<IN, OUT> implements FutureService<IN, OUT> { // 为执行的线程指定名字前缀 private final static String FUTURE_THREAD_PREFIX = "FUTURE-"; private final AtomicInteger nextCounter = new AtomicInteger(0); private String getNextName() { return FUTURE_THREAD_PREFIX + nextCounter.getAndIncrement(); } @Override public Future<?> submit(Runnable runnable) { final FutureTask<Void> future = new FutureTask<>(); new Thread(() -> { runnable.run(); future.finish(null); }, getNextName()).start(); return future; } @Override public Future<OUT> submit(Task<IN, OUT> task, IN input) { final FutureTask<OUT> future = new FutureTask<>(); new Thread(() -> { OUT result = task.get(input); // 设置返回结果 future.finish(result); }, getNextName()).start(); return future; } @Override public Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback) { final FutureTask<OUT> future = new FutureTask<>(); new Thread(() -> { OUT result = task.get(input); // 设置返回结果 future.finish(result); if (null != callback) { // 回调函数 callback.call(result); } }, getNextName()).start(); return future; } }Future主要是耗费时间的操作交给另外一个线程执行,从而达到异步的目的,而不是阻塞等待结果的返回。
import java.util.concurrent.TimeUnit; /** * @ClassName FutureTest01.java * @Description 提交无返回值的任务 * @createTime 2019年10月31日 */ public class FutureTest01 { public static void main(String[] args) throws InterruptedException { FutureService<Void, Void> service = FutureService.newService(); Future<?> future = service.submit(() -> { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("I am finish done ."); }); //get 方法 会使当前线程进入阻塞状态 future.get(); } } package com.jiang.currents.future; import java.util.concurrent.TimeUnit; /** * @ClassName FutureTest02.java * @Description 提交有返回值的任务 * @createTime 2019年10月31日 */ public class FutureTest02 { public static void main(String[] args) throws InterruptedException { //定义一个有返回值的FutureService FutureService<String, Integer> service = FutureService.newService(); //submitf方法立即返回 Future<Integer> future = service.submit(input -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return input.length(); }, "Hello"); //get 方法 会使当前线程进入 System.err.println(future.get()); } } package com.jiang.currents.future; import java.util.concurrent.TimeUnit; /** * @ClassName FutureTest03.java * @Description 提交有返回值的任务 支持回调函数 * @createTime 2019年10月31日 */ public class FutureTest03 { public static void main(String[] args) throws InterruptedException { FutureService<String, Integer> service = FutureService.newService(); Future<Integer> future = service.submit(input -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return input.length(); }, "Hello", System.out::println); } }使用任务完成时hi回调机制可以让调用者不在进行显示get方式获取数据而进入阻塞。