AbstractQueuedSynchronizer

mac2026-05-17  5

文章目录

简介内部类主要属性主要方法可重写方法

简介

AQS的全称是AbstractQueuedSynchronizer,是一个抽象类。它的定位是为Java中几乎所有的锁和同步器提供一个基础框架。

AQS是基于FIFO的队列实现的,并且内部维护了一个状态变量state,通过原子操作更新这个状态变量state即可以实现加锁解锁操作。

一个同步器拥有一个同步队列,可以拥有零个或者多个等待队列。

节点是构成同步队列的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入同步队列的尾部。同步队列的基础结构如下:

+------+ next +------+ next +------+ head | Node | ----> | Node | ----> | Node | tail | | <---- | | <---- | | +------+ prev +------+ prev +------+

节点也是构成等待队列的基础,如果一个线程调用Condition的await方法后,那么该线程将会释放锁、构造成节点加入等待队列尾部并进入等待状态。一个Condition包含一个等待队列,等待队列的基础结构如下

+------+ nextWaiter +------+ nextWaiter +------+ firstWaiter | Node | ----------> | Node | ----------> | Node | lastWaiter | | | | | | +------+ +------+ +------+

内部类

static final class Node { // 标识一个节点是共享模式 static final Node SHARED = new Node(); // 标识一个节点是独占模式 static final Node EXCLUSIVE = null; // 在同步队列中等待的线程等待超时或者被中断,需要从同步同列中取消等待 static final int CANCELLED = 1; // 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点得以运行。标识后继节点需要唤醒。节点在等待等待队列中。 static final int SIGNAL = -1; // 节点在等待队列中,节点线程等待在Condition上,当其它线程对Condition调用了singal()方法后,将会把等待队列中的首个节点转移到同步队列的尾部 static final int CONDITION = -2; // 表示下一次共享式同步状态获取将会无条件地传播下去 static final int PROPAGATE = -3; // 当前节点保存的线程对应的等待状态 volatile int waitStatus; // 同步队列前驱节点 volatile Node prev; // 同步队列后继节点 volatile Node next; // 获取同步状态的线程 volatile Thread thread; // 等待队列的后继节点 Node nextWaiter; // 是否为共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取该节点的前驱节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } }

主要属性

/** * 同步队列的头结点 */ private transient volatile Node head; /** * 同步队列的尾节点 */ private transient volatile Node tail; /** * 同步状态,加锁解锁时需要对这个状态进行操作 */ private volatile int state;

主要方法

/** * 获取同步状态 */ protected final int getState() { return state; } /** * 设置同步状态 */ protected final void setState(int newState) { state = newState; } /** * 调用Unsafe的方法更新同步状态state,该操作具有原子性,更新成功则返回true,否则返回false */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // 获取锁 public final void acquire(int arg) { // tryAcquire方法尝试获取锁,该方法由子类实现。 // 若尝试获取锁失败,先调用addWaiter方法将新节点存储到同步队列的尾部 // 最后调用acquireQueued方法,该方法不断尝试获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 若当前线程尝试获取锁失败,将会调用这个方法 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 这里先尝试把新节点加到同步队列的尾部 // 如果成功了就返回新节点 // 如果没成功再调用enq()方法不断尝试 Node pred = tail; if (pred != null) { // 将新节点的前驱节点指向原来的尾节点 node.prev = pred; // 使用CAS操作将尾节点指向新节点 if (compareAndSetTail(pred, node)) { // 若CAS操作成功,则设置旧尾节点的下一个节点为新节点 pred.next = node; return node; } } // 若上面尝试新节点入队列失败,则会使用enq方法来处理 enq(node); return node; } // 把Node存储在同步队列的尾部 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 如果同步队列为空,则初始化尾节点和头结点 if (compareAndSetHead(new Node())) tail = head; } else { // 新节点的前驱节点指向尾节点 node.prev = t; // 使用CAS操作将尾节点指向新节点,若CAS操作失败,将一直处于for循环中,直至CAS操作成功 if (compareAndSetTail(t, node)) { // 若CAS操作成功,则设置旧尾节点的下一个节点为新节点 t.next = node; // 返回旧尾节点 return t; } } } } // 调用上面的addWaiter()方法使得新节点已经成功入队了 // 这个方法是尝试让当前节点来获取锁的 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 当前线程将一直处于for循环中,直到获取锁成功 for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 若当前节点的前驱节点为头结点,则当前节点才有机会尝试获取锁,tryAcquire是由子类进行实现 if (p == head && tryAcquire(arg)) { // 尝试获取锁成功,将当前节点设置为头结点并返回 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 若没有机会尝试获取锁或者尝试获取锁失败 // shouldParkAfterFailedAcquire返回false,将继续循环 // shouldParkAfterFailedAcquire返回true,将执行parkAndCheckInterrupt方法 // parkAndCheckInterrupt方法用来阻塞线程,当线程被唤醒之后,又继续这个for循环 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // 该方法会在上面的for循环中一直调用,主要功能:将新增节点上一个节点的等待状态设置为SIGNAL // 如果新增节点前一个节点的等待状态为SIGNAL,则返回true // 如果新增节点前一个节点的等待状态为小于等于0,则将前一个节点的等待状态设置为SIGNAL private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // static final int CANCELLED = 1; // static final int SIGNAL = -1; // static final int CONDITION = -2; // static final int PROPAGATE = -3; // waitStatus默认为0 int ws = pred.waitStatus; // 如果前一个节点的状态为-1,说明前一个节点等待被唤醒,则返回true if (ws == Node.SIGNAL) return true; if (ws > 0) { // 若前一个节点的状态大于0,则说明前一个节点已经被取消,则需要将其从队列中剔除 do { // 把前面所有处于已取消状态的节点都从同步队列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前一个节点的状态 小于等于 0 ,则将它的状态更改为-1(等待被唤醒) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 真正阻塞线程的方法 private final boolean parkAndCheckInterrupt() { // 阻塞当前线程 LockSupport.park(this); // 返回线程是否中断 return Thread.interrupted(); } // 释放锁 public final boolean release(int arg) { // tryRelease尝试释放锁,该方法是由子类来实现的 if (tryRelease(arg)) { // 尝试释放锁成功 Node h = head; // 若头结点不为空并且头结点的等待状态不为0,则唤醒头结点的下一个节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 该方法用来唤醒头结点的下一个节点 private void unparkSuccessor(Node node) { // 这里的node为同步队列的头结点 int ws = node.waitStatus; // 如果头结点的等待状态小于0,则将它的等待状态设置为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取头结点的下一个节点 Node s = node.next; if (s == null || s.waitStatus > 0) { // 如果头结点的下一个节点为空或者头结点的下一个节点已经被取消, // 则从尾节点向前遍历取到队列最前面的那个状态不是已取消状态的节点 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 如果 s 不为空,则唤醒该节点的线程 if (s != null) LockSupport.unpark(s.thread); } // 如果该方法放回true,则说明有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; // 若 h == t,h和t均为null或是同一个具体的节点,无后继节点,返回false // h != t,若 h.next == null,返回true。什么情况下h!=t的同时h.next==null?,有其他线程第一次正在入队时,可能会出现。见AQS的enq方法,compareAndSetHead(node)完成,还没执行tail = head语句时,此时tail=null,head=newNode,head.next=null。 // h != t,h != null,当头结点的下一个节点等于当前线程时,返回false return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); // 解释为什么要判断:s.thread != Thread.currentThread() // 根据ReentrantLock的解锁流程,可以看到当线程释放锁之后还是会在队列的head节点,只是把next指针指向下一个可用节点,并唤醒它也就是说任意时刻,head节点可能占用着锁(除了第一次执行enq()入队列时,head仅仅是个new Node(),没有实际对应任何线程,但是却“隐式”对应第一个获得锁但并未入队列的线程,和后续的head在含义上保持一致),也可能释放了锁(unlock()),未被阻塞的head.next节点对应的线程在任意时刻都是有必要去尝试获取锁 }

可重写方法

/** * 独占模式下使用:尝试获取锁 */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 独占模式下使用:尝试释放锁 */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * 共享模式下使用:尝试获取锁 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } /** * 共享模式下使用:尝试释放锁 */ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /** * 锁是否被当前线程独占 */ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }

子类只要实现这几个方法中的一部分就可以实现一个同步器

最新回复(0)