文章目录
简介内部类主要属性主要方法可重写方法
简介
AQS的全称是AbstractQueuedSynchronizer,是一个抽象类。它的定位是为Java中几乎所有的锁和同步器提供一个基础框架。
AQS是基于FIFO的队列实现的,并且内部维护了一个状态变量state,通过原子操作更新这个状态变量state即可以实现加锁解锁操作。
一个同步器拥有一个同步队列,可以拥有零个或者多个等待队列。
节点是构成同步队列的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入同步队列的尾部。同步队列的基础结构如下:
+------+ next
+------+ next
+------+
head
| Node
| ----> | Node
| ----> | Node
| tail
| | <---- | | <---- | |
+------+ prev
+------+ prev
+------+
节点也是构成等待队列的基础,如果一个线程调用Condition的await方法后,那么该线程将会释放锁、构造成节点加入等待队列尾部并进入等待状态。一个Condition包含一个等待队列,等待队列的基础结构如下
+------+ nextWaiter
+------+ nextWaiter
+------+
firstWaiter
| Node
| ----------> | Node
| ----------> | Node
| lastWaiter
| | | | | |
+------+ +------+ +------+
内部类
static final class Node {
static final Node SHARED
= new Node();
static final Node EXCLUSIVE
= null
;
static final int CANCELLED
= 1;
static final int SIGNAL
= -1;
static final int CONDITION
= -2;
static final int PROPAGATE
= -3;
volatile int waitStatus
;
volatile Node prev
;
volatile Node next
;
volatile Thread thread
;
Node nextWaiter
;
final boolean isShared() {
return nextWaiter
== SHARED
;
}
final Node
predecessor() throws NullPointerException
{
Node p
= prev
;
if (p
== null
)
throw new NullPointerException();
else
return p
;
}
}
主要属性
private transient volatile Node head
;
private transient volatile Node tail
;
private volatile int state
;
主要方法
protected final int getState() {
return state
;
}
protected final void setState(int newState
) {
state
= newState
;
}
protected final boolean compareAndSetState(int expect
, int update
) {
return unsafe
.compareAndSwapInt(this, stateOffset
, expect
, update
);
}
public final void acquire(int arg
) {
if (!tryAcquire(arg
) &&
acquireQueued(addWaiter(Node
.EXCLUSIVE
), arg
))
selfInterrupt();
}
private Node
addWaiter(Node mode
) {
Node node
= new Node(Thread
.currentThread(), mode
);
Node pred
= tail
;
if (pred
!= null
) {
node
.prev
= pred
;
if (compareAndSetTail(pred
, node
)) {
pred
.next
= node
;
return node
;
}
}
enq(node
);
return node
;
}
private Node
enq(final Node node
) {
for (;;) {
Node t
= tail
;
if (t
== null
) {
if (compareAndSetHead(new Node()))
tail
= head
;
} else {
node
.prev
= t
;
if (compareAndSetTail(t
, node
)) {
t
.next
= node
;
return t
;
}
}
}
}
final boolean acquireQueued(final Node node
, int arg
) {
boolean failed
= true;
try {
boolean interrupted
= false;
for (;;) {
final Node p
= node
.predecessor();
if (p
== head
&& tryAcquire(arg
)) {
setHead(node
);
p
.next
= null
;
failed
= false;
return interrupted
;
}
if (shouldParkAfterFailedAcquire(p
, node
) &&
parkAndCheckInterrupt())
interrupted
= true;
}
} finally {
if (failed
)
cancelAcquire(node
);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred
, Node node
) {
int ws
= pred
.waitStatus
;
if (ws
== Node
.SIGNAL
)
return true;
if (ws
> 0) {
do {
node
.prev
= pred
= pred
.prev
;
} while (pred
.waitStatus
> 0);
pred
.next
= node
;
} else {
compareAndSetWaitStatus(pred
, ws
, Node
.SIGNAL
);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport
.park(this);
return Thread
.interrupted();
}
public final boolean release(int arg
) {
if (tryRelease(arg
)) {
Node h
= head
;
if (h
!= null
&& h
.waitStatus
!= 0)
unparkSuccessor(h
);
return true;
}
return false;
}
private void unparkSuccessor(Node node
) {
int ws
= node
.waitStatus
;
if (ws
< 0)
compareAndSetWaitStatus(node
, ws
, 0);
Node s
= node
.next
;
if (s
== null
|| s
.waitStatus
> 0) {
s
= null
;
for (Node t
= tail
; t
!= null
&& t
!= node
; t
= t
.prev
)
if (t
.waitStatus
<= 0)
s
= t
;
}
if (s
!= null
)
LockSupport
.unpark(s
.thread
);
}
public final boolean hasQueuedPredecessors() {
Node t
= tail
;
Node h
= head
;
Node s
;
return h
!= t
&&
((s
= h
.next
) == null
|| s
.thread
!= Thread
.currentThread());
}
可重写方法
/**
* 独占模式下使用:尝试获取锁
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 独占模式下使用:尝试释放锁
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享模式下使用:尝试获取锁
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享模式下使用:尝试释放锁
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 锁是否被当前线程独占
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
子类只要实现这几个方法中的一部分就可以实现一个同步器