二刷Java多线程:Java并发包中锁详解(一):抽象同步队列AQS

mac2022-06-30  36

前言

在之前学习Java多线程的过程中,我对于Java并发包中锁相关实现原理这块知识一直是一知半解。在这次二刷Java多线程的过程中,对这块的知识慢慢有了自己的理解,个人认为学习这块内容的学习路线很重要,下面来分享一下我个人认为比较正确的学习路线,如果你也在为这块内容发愁,希望对你有一点启发

Java并发包中锁的实现离不开抽象同步队列AbstractQueuedSynchronizer,也就是常说的AQS,而AQS的实现依赖于Unsafe中的CAS相关的方法和volatile关键字,所以在学AQS之前,不光要对多线程的很多基础知识有个了解,而且要了解魔法类Unsafe以及CAS相关知识,这里可以参考之前的博客。而在学习AQS的过程中也要对AQS中提供的模板方法和子类需要重写的方法有个大致的了解,这样在看独占锁和读写锁的源码的过程中才能理清楚思路,哪些方法是直接使用AQS中的模板方法,哪些方法是子类重写的。如果是第一次学习这块知识,建议先从实现原理上去理解,再去分析实现细节。这次二刷多线程的过程中,学习了《Java并发编程之美》、《Java并发编程的艺术》、极客时间的《Java并发编程实战》以及网上很多优秀的博客,这次综合这些资料对知识点进行了整理,希望你看完会有所收获

一、抽象同步队列AQS

队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他组件的基本框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类

同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作

1、队列同步器的接口

同步器的设计是基于模板方法模式的,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法

同步器提供的功能可以分为独占功能和共享功能两类

1)、重写同步器的指定方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态:

//同步状态共享变量,使用volatile修饰保证线程可见性 private volatile int state; //获取当前同步状态 protected final int getState() { return state; } //设置当前同步状态 protected final void setState(int newState) { state = newState; } //使用CAS设置当前状态,该方法能保证状态设置的原子性 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

2)、同步器中可重写的方法如下:

//独占模式: //独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 protected boolean tryAcquire(int arg) //独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 protected boolean tryRelease(int arg) //共享模式: //共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败 protected int tryAcquireShared(int arg) //共享式释放同步状态 protected boolean tryReleaseShared(int arg) //当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 protected boolean isHeldExclusively()

3)、同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况,具体如下:

//独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回 public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } //在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } //共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占模式的主要区别是在同一时刻可以有多个线程获取到同步状态 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } //与acquireShared(int arg)相同,该方法响应中断 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //在acquireSharedInterruptibly(int arg)基础上增加了超时限制 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } //独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //共享式的释放同步状态 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } //获取等待在同步队列上的线程集合 public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; }

2、同步队列

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态

同步队列中的节点(Node结点类)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点

static final class Node { //标记一个正在共享模式中等待的节点 static final Node SHARED = new Node(); //标记一个正在独占模式中等待的节点 static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; //前驱结点,当节点加入同步队列时被设置(尾部添加) volatile Node prev; //后继节点 volatile Node next; //获取同步状态的线程 volatile Thread thread; //连接到下个等待状态的结点,或者特殊值SHARED。因为条件队列只有在独占模式中保持时才会被访问,所以只需要一个简单的队列来在节点等待Condition时保持节点。然后,被转移到队列重新获取。因为Condition只能是排他的,所以使用特殊值来指示共享模式 Node nextWaiter;

waitStatus用来表示等待状态,包含如下状态:

CANCELLED:值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化SIGNAL:值为-1,后继节点的线程处于等待状态,而当前节点的线程如果释放呃同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行CONDITION:值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中PROPAGATE:值为-3,表示下一次共享式同步状态获取将会无条件地被传播下去INITIAL:值为0,初始状态

节点是构成同步队列的基础,同步器拥有首节点和尾节点,没有成功获取同步状态的线程将会成为节点加入该队列的尾部 同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造称为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程认为的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联 同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点 设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可

3、独占式同步状态获取与释放

通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后继对线程进行中断操作时,线程不会从同步队列中移出

//独占模式获取锁的方法,排队状态时可能多次阻塞和非阻塞。通常用来实现lock方法 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点并通过addWaiter(Node mode)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(final Node node, int arg)方法,使得该节点以死循环的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱结点的出队或阻塞线程被中断来实现

private Node addWaiter(Node mode) { //创建一个独占式结点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //判断队列中是否有元素,如果有就设置当前节点为队尾结点并返回 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //队列中没有元素进行入队操作 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; //如果队尾结点是空,初始化head和tail(懒加载),这里是通过CAS设置尾结点,不成功就一直重试 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } //如果尾结点非空,就采用CAS操作将当前结点插入到尾结点后面,如果在插入的时候尾结点有变化,就将尾结点向后移动直到移动到最后一个结点为止,然后再把当前结点插入到尾结点后面,尾指针指向当前结点,入队成功 else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

最后,看下acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取当前节点的前驱结点 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

acquireQueued内部也是一个死循环,只有前驱结点是头结点的结点,才有机会去tryAcquire。若tryAcquire成功,表示获取同步状态成功,就将此结点设置为头结点;若前驱结点不是头结点,或者tryAcquire失败,则进入shouldParkAfterFailedAcquire去判断判断当前线程是否应该阻塞,若可以,调用parkAndCheckInterrupt阻塞当前线程,直到被中断或者被前驱结点唤醒。若还不能阻塞,继续循环 独占式同步状态获取流程: 当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程

独占式同步状态获取和释放过程:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱结点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点

4、共享式同步状态获取与释放

对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态。尝试获取同步状态的方法tryAcquireShared返回值为int

protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }

返回值情况如下:

当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取

当返回值等于0时,表示获取同步状态成功,但没有可用同步状态了

当返回值小于0时,表示获取同步状态失败

获取同步状态:

public final void acquireShared(int arg) { //返回值小于0,获取同步状态失败,需要进行排队 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); //获取同步状态成功,直接返回去干自己的事儿 } private void doAcquireShared(int arg) { //构造一个共享结点,添加到同步队列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //线程parking过程中是否被中断过 boolean interrupted = false; for (;;) { //找到前驱结点 final Node p = node.predecessor(); //头结点持有同步状态,只有前驱是头结点,才有机会尝试获取同步状态 if (p == head) { //尝试获取同步装填 int r = tryAcquireShared(arg); //r>=0,获取成功,r 值表示剩余可用同步状态 if (r >= 0) { //获取成功就将当前结点设置为头结点,若还有可用资源,传播下去,也就是继续唤醒后继结点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //是否能进入parking状态 if (shouldParkAfterFailedAcquire(p, node) && //阻塞线程 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

大体逻辑与独占式的acquireQueued差距不大,只不过由于是共享式,会有多个线程同时获取到线程,也可能同时释放线程,空出很多同步状态,所以前驱结点是头结点的结点获取到同步状态,如果还有可用资源,会继续传播下去

释放同步状态:

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { //死循环,共享模式,持有同步状态的线程可能有多个,采用循环CAS保证线程安全 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒后继结点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件,它和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程

最新回复(0)