CountDownLatch

mac2026-05-28  2

文章目录

简介构造函数内部类countDown()await()示例

简介

CountDownLatch允许一个或者多个线程等待其它线程完成操作。

CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候,需要带入该计数器值,该值就表示了线程的数量。

每当一个线程完成自己的任务后,计数器的值就会减 1 。当计数器的值变为0时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了。

CountDownLatch中只包含了Sync一个内部类,它没有公平/非公平模式。

构造函数

public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // count表示线程数量 this.sync = new Sync(count); }

内部类

private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 传入初始次数 Sync(int count) { setState(count); } // 获取剩余次数 int getCount() { return getState(); } // 尝试获取共享锁 protected int tryAcquireShared(int acquires) { // 判断同步状态是否为0 // 如果为0,则获取成功 // 如果不为0,则需要排队 return (getState() == 0) ? 1 : -1; } // 尝试释放锁 protected boolean tryReleaseShared(int releases) { for (;;) { // 获取state int c = getState(); // 如果state等于0,则不能释放了 if (c == 0) return false; // 将state的值减1 int nextc = c-1; // 通过CAS操作更新state的值, // 如果state等于0,说明所有的线程已经执行完毕,获取锁成功,则会唤醒同步队列中的等待线程 if (compareAndSetState(c, nextc)) return nextc == 0; } } }

countDown()

// CountDownLatch.countDown() public void countDown() { sync.releaseShared(1); } // AbstractQueuedSynchronizer.releaseShared(int arg) public final boolean releaseShared(int arg) { // tryReleaseShared尝试释放锁,由子类CountDownLatch.Sync实现 // 具体分析见上面的内部类解释 if (tryReleaseShared(arg)) { // 如果共享锁释放成功,即state为0,则需要唤醒排队的线程 doReleaseShared(); // 在ReentrantReadWriteLock章节已经分析过 return true; } return false; }

await()

// CountDownLatch.await() public void await() throws InterruptedException { // 获取锁 sync.acquireSharedInterruptibly(1); } // AbstractQueuedSynchronizer.acquireSharedInterruptibly(int arg) public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取共享锁,由子类CountDownLatch.Sync实现 // 具体分析见上面的内部类解释 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); // 若获取锁失败,则加入同步队列中排队 } // AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(int arg) // 若前面获取锁失败,则加入到同步队列进行排队 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 构造一个节点加入到同步队列中,在 ReentrantLock 章节已经分析过这个方法 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取当前节点的同一个节点 final Node p = node.predecessor(); if (p == head) { // 若当前节点的前一个节点为头结点,则尝试获取许可 int r = tryAcquireShared(arg); if (r >= 0) { // 尝试获取许可成功,则唤醒后面的节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // shouldParkAfterFailedAcquire和parkAndCheckInterrupt的实现方式与ReentrantLock一样 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); // 将节点取消 } } // AbstractQueuedSynchronizer.setHeadAndPropagate() private void setHeadAndPropagate(Node node, int propagate) { // h为旧的头节点 Node h = head; // 设置当前节点为新头节点 setHead(node); // 如果当前头结点的状态是<0,意味着它是SINGAL和Propagate,那么也去唤醒后继结点。如果头结点为空(可能在其它线程中被释放了),那么也唤醒后继结点来获取资源。 // 如果旧的头节点或新的头节点为空或者其等待状态小于0(表示状态为SIGNAL/PROPAGATE) if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 需要传播 // 取下一个节点 Node s = node.next; // 如果下一个节点为空,或者是需要获取读锁的节点 if (s == null || s.isShared()) // 唤醒下一个节点 doReleaseShared(); } } // AbstractQueuedSynchronizer.doReleaseShared() // 这个方法只会唤醒一个节点 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果头节点状态为SIGNAL,说明要唤醒下一个节点 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 去唤醒下一个节点 unparkSuccessor(h); } else if (ws == 0 && // 把头节点的状态改为PROPAGATE成功才会跳到下面的if !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果唤醒后head没变,则跳出循环 if (h == head) // loop if head changed break; } } //在自旋的阶段,每一次循环的过程都是首先获得头结点,如果头结点不为空且不为尾结点(阻塞队列里面只有一个结点),那么先获得该节点的状态,如果是SIGNAL的状态,则代表它需要有后继结点去唤醒,首先将其的状态变为0,因为是要释放资源了,它也不需要做什么了,所以转变为初始状态,然后去唤醒后继结点unparkSuccessor(h),如果结点状态一开始就是0,那么就给他转换成PROPAGATE状态,保证在后续获取资源的时候,还能够向后面传播

示例

CountDownLatch 不仅能实现一个线程等待多个线程条件成立,还能实现多个线程等待一个线程条件成立。

public static void main(String[] args) throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(5); for (int i = 0; i < 5; i++) { new Thread(()->{ try { System.out.println(Thread.currentThread().getName() + " 开始."); startSignal.await(); System.out.println(Thread.currentThread().getName() + " 在干活."); doneSignal.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } Thread.sleep(2000); System.out.println("主线程开始."); startSignal.countDown(); System.out.println("主线程等待其它线程结束."); doneSignal.await(); System.out.println("其它线程已经结束,主线程开始做其它事情."); }
最新回复(0)