日常开发中经常碰到等待其他线程运行结束的情况,之前学习的方法中,可以使用Thread.join()方法实现,但是有很多局限性,且不够灵活.
使用CountDownLatch可以更好的实现这个功能.
// new一个CountDownLatch对象,传入参数为要监控的线程数. CountDownLatch countDownLatch = new CountDownLatch(2); // 主线程(或者灵活的选择使用场景)中使用,会挂起当前线程,知道CountDownLatch对象的计数器为0 countDownLatch.await(); // 在子线程中使用,CountDownLatch的计数器会减一,当计数器为0时, countDownLatch.countDown();它的优点有:
更灵活:可以再子线程的任意处位置让程序计数器自减.更实用:在线程池开发中,通常不会直接操作线程对象传入的都是Runnable或者Callable对象,没有join()方法可以调用.而CountDownLatch的使用不受影响. 内部使用AQS实现,计数器的值被赋给了AQS的状态变量state.
当CountDownLatch对象调用await()方法后,当前线程或被阻塞,知道下面的情况才会返回:
所有线程都调用了CountDownLatch对象的countDown()方法,也就是说计数器的值为0时.其他线程调用了当前线程的interrupt()方法中断了当前线程,该方法会抛出InterruptedException异常后返回. public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 响应中断的共享锁 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // state若不为0,进入AQS队列等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int arg) { return (getState() == 0) ? 1 : -1; } 与上一个类似,但是当设置的timeout时间到了,会因为超时返回false.
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } CountDownLatch的计数器是一次性的,也就是说等计数器的值为0时,在调用await和countDown方法都会立刻返回,起不到线程同步的效果了.
为了满足计数器可以重置的需要,JDK提供了CyclicBarrier类,功能并不局限于CountDownLatch的功能.
从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后在全部同时执行.当所有等待线程执行完毕并重置CyclicBarrier的状态后,它可以被重用.
之所以被叫做屏障是因为调用await方法后会被阻塞,这个阻塞点被称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行.
public class Demo { // 创建一个CyclicBarrier实例,添加一个当所有线程冲破屏障后都会执行的方法 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() { @Override public void run() { System.out.println("ok ok"); } }); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); // 线程池中添加两个任务 executorService.submit(new Runnable() { @Override public void run() { try { System.out.println("A1"); // 执行完await方法后会等待其他线程 // 当所有线程都执行之后,会执行传入cyclicBarrier对象的方法 cyclicBarrier.await(); System.out.println("A2"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }); executorService.submit(new Runnable() { @Override public void run() { try { System.out.println("B1"); cyclicBarrier.await(); System.out.println("B2"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }); executorService.shutdown(); } } /执行结果/// A1 B1 ok ok B2 A2 ok ok CyclicBarrier基于独占锁实现,本质底层还是AQS的.
parties用来记录线程个数,表示当多少线程调用了await()方法后,所有线程才会冲破屏障.是构造器中第一个参数.
count记录当前await()方法的调用数,一开始等于parties.这里使用两个值存储的意图在于实现复用.
变量generation内部有一个变量broken,用来记录当前屏障是否被打破,并没有被声明为volatile,其操作都在锁内部使用,不需要.
挂起调用的线程直到下面的情况才会返回:
parties个线程都调用了await()方法,也就是线程都到达了屏障点其他线程中断了当前线程,会抛出InterruptedException与当前屏障点关联的Generation对象的broken标志被设置为true,会抛出BrokenBarrierException public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } 此方法除了上面的三种情况外,超时也会返回,不报异常,返回false
public int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException { return dowait(true, unit.toNanos(timeout)); } 不同于前两个的地方是,内部计数器是递增的,而且不需要知道需要同步的现成的个数,而是需要在主线程等待的地方传入线程数.
内部有两个AQS的实现,分别对应公平和非公平策略.
调用该方法的目的是获取一个信号量资源,如果当前信号量个数大于0,则计数减一,直接返回.
如果当前信号量等于0,将当前线程放入AQS阻塞队列.
该方法响应中断,会抛出异常.
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果线程已经是中断状态,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取资源(多个等待线程之间要阻塞) if (tryAcquireShared(arg) < 0) // doAcquireSharedInterruptibly(arg); } //非公平策略 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { // 获取当前信号量,计算剩余值 int available = getState(); int remaining = available - acquires; // 如果剩余值小于0,返回剩余值(这一步返回正数说明当前的信号量满足要求,不用被阻塞) // 否则进行CAS竞争,成功的线程返回修改后的值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //公平策略 protected int tryAcquireShared(int acquires) { for (;;) { // 公平性的关键,查看该节点的前继节点是否也在获取资源,如果是,本线程放弃 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } 传入当前需要的信号量.
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } 该方法挂起时不响应中断
public void acquireUninterruptibly() { sync.acquireShared(1); } 同上
public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }