Java并发-----第10章 Java并发包中线程同步器原理剖析

mac2025-10-04  3

第10章 Java并发包中线程同步器原理剖析


文章目录

第10章 Java并发包中线程同步器原理剖析@[toc]1. CountDownLatch原理剖析(1). 介绍(2). 实现原理1). 结构2). void await()方法3). boolean await(long timeout, TimeUnit unit)方法4). void countDown()方法5). long getCount()方法 2. 回环屏障CyclicBarrier原理探究(1). 介绍(2). 实现原理1). 结构2). int await()方法2). int await(long timeout, TimeUnit unit)方法3). int dowait(boolean timed, long nanos)方法 3. 信号量Semaphore原理探究(1). 介绍(2). 实现原理1). 结构2). void acquire()方法3). void acquire(int permits)方法4). void acquireUninterruptibly()方法5). void acquireUninterruptibly(int permits)方法6). void release()方法7). void release(int permits)方法

1. CountDownLatch原理剖析

(1). 介绍

​ 日常开发中经常碰到等待其他线程运行结束的情况,之前学习的方法中,可以使用Thread.join()方法实现,但是有很多局限性,且不够灵活.

​ 使用CountDownLatch可以更好的实现这个功能.

// new一个CountDownLatch对象,传入参数为要监控的线程数. CountDownLatch countDownLatch = new CountDownLatch(2); // 主线程(或者灵活的选择使用场景)中使用,会挂起当前线程,知道CountDownLatch对象的计数器为0 countDownLatch.await(); // 在子线程中使用,CountDownLatch的计数器会减一,当计数器为0时, countDownLatch.countDown();

它的优点有:

更灵活:可以再子线程的任意处位置让程序计数器自减.更实用:在线程池开发中,通常不会直接操作线程对象传入的都是Runnable或者Callable对象,没有join()方法可以调用.而CountDownLatch的使用不受影响.

(2). 实现原理

1). 结构

​ 内部使用AQS实现,计数器的值被赋给了AQS的状态变量state.

2). void await()方法

当CountDownLatch对象调用await()方法后,当前线程或被阻塞,知道下面的情况才会返回:

所有线程都调用了CountDownLatch对象的countDown()方法,也就是说计数器的值为0时.其他线程调用了当前线程的interrupt()方法中断了当前线程,该方法会抛出InterruptedException异常后返回. public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 响应中断的共享锁 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // state若不为0,进入AQS队列等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int arg) { return (getState() == 0) ? 1 : -1; }

3). boolean await(long timeout, TimeUnit unit)方法

​ 与上一个类似,但是当设置的timeout时间到了,会因为超时返回false.

public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }

4). void countDown()方法

public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { // 设置state-- if (tryReleaseShared(arg)) { // 上一步操作成功,则说明state已经为0,释放资源(AQS队列首元素啥的) doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { for (;;) { // 获取state,如果为0,则不需要在释放资源了,返回false // 防止计数器为零后,其他线程仍然调用,使计数器为负数 // 否则state--(自旋+CAS操作) int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }

5). long getCount()方法

public long getCount() { return sync.getCount(); } int getCount() { return getState(); }

2. 回环屏障CyclicBarrier原理探究

(1). 介绍

​ CountDownLatch的计数器是一次性的,也就是说等计数器的值为0时,在调用await和countDown方法都会立刻返回,起不到线程同步的效果了.

​ 为了满足计数器可以重置的需要,JDK提供了CyclicBarrier类,功能并不局限于CountDownLatch的功能.

​ 从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后在全部同时执行.当所有等待线程执行完毕并重置CyclicBarrier的状态后,它可以被重用.

​ 之所以被叫做屏障是因为调用await方法后会被阻塞,这个阻塞点被称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行.

public class Demo { // 创建一个CyclicBarrier实例,添加一个当所有线程冲破屏障后都会执行的方法 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() { @Override public void run() { System.out.println("ok ok"); } }); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); // 线程池中添加两个任务 executorService.submit(new Runnable() { @Override public void run() { try { System.out.println("A1"); // 执行完await方法后会等待其他线程 // 当所有线程都执行之后,会执行传入cyclicBarrier对象的方法 cyclicBarrier.await(); System.out.println("A2"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }); executorService.submit(new Runnable() { @Override public void run() { try { System.out.println("B1"); cyclicBarrier.await(); System.out.println("B2"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }); executorService.shutdown(); } } /执行结果/// A1 B1 ok ok B2 A2 ok ok

(2). 实现原理

1). 结构

​ CyclicBarrier基于独占锁实现,本质底层还是AQS的.

​ parties用来记录线程个数,表示当多少线程调用了await()方法后,所有线程才会冲破屏障.是构造器中第一个参数.

​ count记录当前await()方法的调用数,一开始等于parties.这里使用两个值存储的意图在于实现复用.

​ 变量generation内部有一个变量broken,用来记录当前屏障是否被打破,并没有被声明为volatile,其操作都在锁内部使用,不需要.

2). int await()方法

挂起调用的线程直到下面的情况才会返回:

parties个线程都调用了await()方法,也就是线程都到达了屏障点其他线程中断了当前线程,会抛出InterruptedException与当前屏障点关联的Generation对象的broken标志被设置为true,会抛出BrokenBarrierException public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }

2). int await(long timeout, TimeUnit unit)方法

​ 此方法除了上面的三种情况外,超时也会返回,不报异常,返回false

public int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException { return dowait(true, unit.toNanos(timeout)); }

3). int dowait(boolean timed, long nanos)方法

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,TimeoutException { // 加锁 final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; // 如果屏障已经被破坏,抛异常 if (g.broken) throw new BrokenBarrierException(); // 响应中断 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 计数器减一 int index = --count; if (index == 0) { // 进行完本次计数后,全部线程都到达了屏障点,即将进行固定的回调 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); // 运行成功(无异常)标记 ranAction = true; // 重置CyclicBarrier,换代,释放所有的锁 nextGeneration(); return 0; } finally { // 如果未执行成功,设置为屏障被冲破 if (!ranAction) breakBarrier(); } } // 自旋 for (;;) { try { // 是否有超时的限制,没有则直接挂起 if (!timed) trip.await(); // 时间合理,则挂起相应时间 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 被中断时判断是否换代,没换代的话是否还没被破坏 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 被破坏说明此时是经历自旋到达这里的 Thread.currentThread().interrupt(); } } // 当有任何一个线程中断了,就会调用breakBarrier方法 // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果已经换代。那么直接返回index if (g != generation) return index; // 超时之后打破屏障并且抛异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }

3. 信号量Semaphore原理探究

(1). 介绍

​ 不同于前两个的地方是,内部计数器是递增的,而且不需要知道需要同步的现成的个数,而是需要在主线程等待的地方传入线程数.

(2). 实现原理

1). 结构

​ 内部有两个AQS的实现,分别对应公平和非公平策略.

2). void acquire()方法

​ 调用该方法的目的是获取一个信号量资源,如果当前信号量个数大于0,则计数减一,直接返回.

​ 如果当前信号量等于0,将当前线程放入AQS阻塞队列.

​ 该方法响应中断,会抛出异常.

public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果线程已经是中断状态,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取资源(多个等待线程之间要阻塞) if (tryAcquireShared(arg) < 0) // doAcquireSharedInterruptibly(arg); } //非公平策略 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { // 获取当前信号量,计算剩余值 int available = getState(); int remaining = available - acquires; // 如果剩余值小于0,返回剩余值(这一步返回正数说明当前的信号量满足要求,不用被阻塞) // 否则进行CAS竞争,成功的线程返回修改后的值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //公平策略 protected int tryAcquireShared(int acquires) { for (;;) { // 公平性的关键,查看该节点的前继节点是否也在获取资源,如果是,本线程放弃 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

3). void acquire(int permits)方法

​ 传入当前需要的信号量.

public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }

4). void acquireUninterruptibly()方法

​ 该方法挂起时不响应中断

public void acquireUninterruptibly() { sync.acquireShared(1); }

5). void acquireUninterruptibly(int permits)方法

​ 同上

public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }

6). void release()方法

public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { // 这里自旋+CAS,一定会成功 if (tryReleaseShared(arg)) { // 成功则调用park方法唤醒挂起的等待线程 // 等待线程会进行信号量的判断,如果足够就解锁,不足继续挂起 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }

7). void release(int permits)方法

public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
最新回复(0)