Semaphore 机制及源码分析

mac2024-06-05  59

前情提要:

        https://blog.csdn.net/qq_32140607/article/details/102669386

https://blog.csdn.net/qq_32140607/article/details/102796524


首先交代概念

Semaphore是一个计数信号量,Semaphore经常用于限制获取资源的线程数量;

AbstractQueuedSynchronizer(AQS):队列同步器,内部类,维护了一个Node对象的链表结构来管理线程的阻塞与释放等


首先从Semaphore开始

先说结论:

Semaphore可以定义若干个信号量,信号量的数量即为当前资源允许被同时访问的线程数量,即在同一个时刻,允许若干个线程同时操作.实现方式为AQS(AbstractQueuedSynchronizer)的共享锁.

每当有一个新线程进入时,将信号量数减一,当数量为0时,下一个进入的线程将会被阻塞(ASQ内部维护了一个链表结构,将新进入的线程插入到链表尾部).当持有锁的线程被释放时,信号量加一,唤醒队列中的第一个线程,以此保证固定数量的线程可访问资源.

接下来进入一个demo

public static void main(String[] args) { /* * Semaphore是一个计数信号量,Semaphore经常用于限制获取资源的线程数量 */ // 声明5个窗口 state: 资源数 Semaphore windows = new Semaphore(3); for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { // 占用窗口(当窗口被占满,下一个进入的线程会被阻塞) windows.acquire(); System.out.println(Thread.currentThread().getName() + ": 开始买票"); //模拟买票流程 Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + ": 购票成功"); // 释放窗口 windows.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }

跟入代码,Thread-0进入

 

跟入发现执行到以下代码,这一段后文会继续介绍

继续跟入,看到重点调用了nonfairTryAcquireShared这个方法,看名称知道是一个非公平的共享锁获取方法.

 可以看到当前的AQS里的state属性为3,这个值是在Semaphore对象创建的时候通过构造方法传入的,即为信号量数,也就是允许同时访问资源的线程数.运行到断点这行时,将state减一,并通过CAS更新state的值,并将更新的state值返回.

接下来返回到外层代码,标记部分即为刚刚返回的值,刚才通过CAS更新了state的值,当前的state为2,所以if条件不符合,跳过doAcquireSharedInterruptibly方法

此刻便已经拿到了这个共享锁.

从上述代码逻辑分析可以看出,当AQS的state减少至-1之前(第4个线程进入之前),逻辑同上,也就意味着有3个线程可以同时操作资源(买票) ,这3个线程便持有该共享锁.那么在没有线程释放锁,第4个线程进入的时候会怎么样?答案是阻塞,进入AQS维护的Node链表中.(与ReentrantLock的逻辑类似,但不同的是,ReentrantLock的state状态值初始是1,每当同一个线程再次进入时state+1,不同线程进入时阻塞.而Semaphore的初始为N(本篇中传入的是3),每次线程进入时state-1,当数值为负数是,阻塞)

还是以代码断点来看(第2,3个线程获取共享锁的步骤省略),当第4个线程进入时发生了什么.

Thread-3进入时,此刻可分配的信号量被前三个线程消耗完毕,Thread-3进入doAcquireSharedInterruptibly方法

而这个方法中会生成一个Node对象,记录当前的线程,并将其插入当前链表的尾部(详细逻辑上一篇有介绍https://blog.csdn.net/qq_32140607/article/details/102796524,这里不再赘述,只简要概括),最后调用parkAndCheckInterrupt()方法将当前线程(Thread-3)阻塞.并将当前线程对应的Node节点的上一个节点中的waitStatus赋值为-1(自旋执行shouldParkAfterFailedAcquire方法),用来标识Thread-3是可以被唤醒的.

同样,当第5个线程进入时也会被阻塞,并将其排在Thread-3对应Node的后面

简单描述下插入链表的逻辑

//创建一个Node对象 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 尝试快速将该节点加入到队列的尾部 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果快速加入失败,则通过 enq方式入列 enq(node); return node; } private Node enq(final Node node) { // CAS自旋,直到加入队尾成功 for (;;) { Node t = tail; if (t == null) { // 如果队列为空,则必须先初始化CLH队列,新建一个空节点标识作为 Hader节点,并将tail 指向它 if (compareAndSetHead(new Node())) tail = head; } else {// 正常流程,加入队列尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

接下来看看当有一个线程释放共享锁时,执行逻辑是怎样的.

当Thread-0释放时,进入release 方法

 

可以看到,最后执行到tryReleaseShared方法,该方法将AQS的state+1,并通过CAS更新

 返回上一层执行doReleaseShared方法

 而doReleaseShared这个方法会将AQS内部类Node的head节点的waitStatus通过CAS的方式,赋值为0,然后调用unparkSuccessor这个方法,唤醒head节点的下一个节点(Thread-3对应的node)

唤醒Thread-3

 执行完之后,被阻塞的Thread-3就唤醒了.

 然后下一次自旋,调用tryAcquireShared方法,重新获取锁,回到了获取共享锁的逻辑

又回到了信号量减一,判断当前持有锁的线程是否大于信号量值的逻辑,此时Thread-0刚被释放,state由0变为1,所以此时,Thread3能够获取到共享锁.之后的逻辑就是维护Node链表结构,重新设置head,上篇已详细介绍,本篇不再赘述.

  


 

最新回复(0)