ReentrantLock实现了Lock接口
public interface Lock { /** * 获取锁 */ void lock(); /** * 获取锁,可以中断 */ void lockInterruptibly() throws InterruptedException; /** * 尝试获取锁,获取成功则返回true,否则返回false */ boolean tryLock(); /** * 尝试获取锁,如果没获取到锁,就等待一段时间,这段时间内还没获取到锁就返回false */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** * 释放锁 */ void unlock(); /** * 条件锁 */ Condition newCondition(); }ReentrantLock有三个内部类
// Sync 继承 AQS,主要方法 abstract static class Sync extends AbstractQueuedSynchronizer{ // 加锁抽象方法 abstract void lock(); // 非公平尝试获取锁 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取同步状态 int c = getState(); // 若c = 0,说明此时没有线程获取锁 if (c == 0) { // 使用CAS更新同步状态变量 if (compareAndSetState(0, acquires)) { // 如果更新成功,说明获取锁成功,把当前线程设为独占线程 setExclusiveOwnerThread(current); return true; } } // 如果 c 不为0,说明此时已经有现成获取锁,此时判断获取锁的线程是否是当前线程, else if (current == getExclusiveOwnerThread()) { // 如果获取锁的线程是当前线程,则可以再次获取锁 int nextc = c + acquires; // 将同步状态变量+acquires if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 尝试释放锁 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 判断当前线程是否获取锁,若当前线程未获取锁,则抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 当同步状态等于0时,锁才能释放成功,因为锁可重入,所以同步状态的值可能会大于1 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } } // 实现公平锁,主要方法 static final class NonfairSync extends Sync { // 非公平加锁,重写父类 Sync 的lock方法 final void lock() { // 尝试使用CAS操作更新同步状态 if (compareAndSetState(0, 1)) // 若更新成功,说明成功获取锁,将当前线程设置为独占线程 setExclusiveOwnerThread(Thread.currentThread()); else // 调用父类 AbstractQueuedSynchronizer 的 acquire(int arg)方法获取锁, // 具体分析见前面AbstractQueuedSynchronizer的主要方法分析 acquire(1); } // 非公平尝试获取锁 protected final boolean tryAcquire(int acquires) { // 调用父类 Sync 的 nonfairTryAcquire(int acquires) 方法,非公平的尝试获取锁 // 具体分析见前面 Sync 的主要方法分析 return nonfairTryAcquire(acquires); } } // 实现非公平锁,主要方法 static final class FairSync extends Sync { // 公平加锁,重写父类 Sync 的lock方法 final void lock() { // 调用父类 AbstractQueuedSynchronizer 的 acquire(int arg)方法获取锁, // 具体分析见前面AbstractQueuedSynchronizer的主要方法分析 acquire(1); } // 公平尝试获取锁 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 与非公平锁不同的是,此处多了hasQueuedPredecessors()判断,该方法是实现公平锁的关键。 // 调用父类AbstractQueuedSynchronizer的hasQueuedPredecessors方法 // 如果hasQueuedPredecessors返回true,表示有其他线程先于当前线程等待获取锁,此时为了实现公平,保证等待时间最长的线程先获取到锁,不能执行CAS。 // 具体分析见前面AbstractQueuedSynchronizer的主要方法 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 如果 hasQueuedPredecessors 返回FALSE,则执行CAS尝试更新同步状态 // 若更新成功,说明获取锁成功,将当前线程设置为独占线程 setExclusiveOwnerThread(current); return true; } } // 可重入锁,当前线程已经持有锁时,可以再次获取锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }在公平锁中,如果有另一个线程持有锁或者有其他线程在等待队列中等待这个锁,那么新发出的请求的线程将被放入到同步队列尾部。而非公平锁,只有当锁被某个线程持有时,新发出请求的线程才会被放入同步队列尾部。所以,它们的差别在于非公平锁会有更多的机会去抢占锁。
lock源码分析
public void lock() { // ReentrantLock.NonfairSync.lock() sync.lock(); } // 该方法在ReentrantLock.NonfairSync中 final void lock() { // 首先使用CAS操作尝试更新同步状态, if (compareAndSetState(0, 1)) // 若更新成功则说明成功获取锁,将当前线程设置为独占线程 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); // 调用父类AbstractQueuedSynchronizer的acquire(int arg)方法 } // 该方法在AbstractQueuedSynchronizer中 public final void acquire(int arg) { // AbstractQueuedSynchronizer中的tryAcquire由子类NonfairSync进行重写 // 若尝试获取锁失败,先调用AbstractQueuedSynchronizer.addWaiter方法将新节点存储到同步队列的尾部 // 最后调用AbstractQueuedSynchronizer.acquireQueued方法,该方法不断尝试获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // NonfairSync 尝试获取锁 protected final boolean tryAcquire(int acquires) { // 调用父类Sync的nonfairTryAcquire方法,尝试非公平获取锁 return nonfairTryAcquire(acquires); } // 该方法在Sync中 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取同步状态 int c = getState(); // 若c = 0,说明此时没有线程获取锁 if (c == 0) { // 使用CAS更新同步状态变量 if (compareAndSetState(0, acquires)) { // 如果更新成功,说明获取锁成功,把当前线程设为独占线程 setExclusiveOwnerThread(current); return true; } } // 如果 c 不为0,说明此时已经有线程获取锁,此时判断获取锁的线程是否是当前线程, else if (current == getExclusiveOwnerThread()) { // 如果获取锁的线程是当前线程,则可以再次获取锁 int nextc = c + acquires; // 将同步状态变量+acquires if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 该方法在 AbstractQueuedSynchronizer 中 // 若当前线程尝试获取锁失败,将会调用这个方法 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 这里先尝试把新节点加到同步队列的尾部 // 如果成功了就返回新节点 // 如果没成功再调用enq()方法不断尝试 Node pred = tail; if (pred != null) { // 将新节点的前驱节点指向原来的尾节点 node.prev = pred; // 使用CAS操作将尾节点指向新节点 if (compareAndSetTail(pred, node)) { // 若CAS操作成功,则设置旧尾节点的下一个节点为新节点 pred.next = node; return node; } } // 若上面尝试新节点入队列失败,则会使用enq方法来处理 enq(node); return node; } // 该方法在 AbstractQueuedSynchronizer 中 // 把Node存储在同步队列的尾部 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 如果同步队列为空,则初始化尾节点和头结点 if (compareAndSetHead(new Node())) tail = head; } else { // 新节点的前驱节点指向尾节点 node.prev = t; // 使用CAS操作将尾节点指向新节点,若CAS操作失败,将一直处于for循环中,直至CAS操作成功 if (compareAndSetTail(t, node)) { // 若CAS操作成功,则设置旧尾节点的下一个节点为新节点 t.next = node; // 返回旧尾节点 return t; } } } } // 该方法在 AbstractQueuedSynchronizer 中 // 调用上面的addWaiter()方法使得新节点已经成功入队了 // 这个方法是尝试让当前节点来获取锁的 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 当前线程将一直处于for循环中,直到获取锁成功 for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 若当前节点的前驱节点为头结点,则当前节点才有机会尝试获取锁, // tryAcquire是由子类NonfairSync进行实现,前面已经分析过 if (p == head && tryAcquire(arg)) { // 尝试获取锁成功,将当前节点设置为头结点并返回 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 若没有机会尝试获取锁或者尝试获取锁失败 // shouldParkAfterFailedAcquire返回false,将继续循环 // shouldParkAfterFailedAcquire返回true,将执行parkAndCheckInterrupt方法 // parkAndCheckInterrupt方法用来阻塞线程,当线程被唤醒之后,又继续这个for循环 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // 该方法在 AbstractQueuedSynchronizer 中 // 该方法会在上面的for循环中一直调用,主要功能:将新增节点上一个节点的等待状态设置为SIGNAL // 如果新增节点前一个节点的等待状态为SIGNAL,则返回true // 如果新增节点前一个节点的等待状态为小于等于0,则将前一个节点的等待状态设置为SIGNAL private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // static final int CANCELLED = 1; // static final int SIGNAL = -1; // static final int CONDITION = -2; // static final int PROPAGATE = -3; // waitStatus默认为0 int ws = pred.waitStatus; // 如果前一个节点的状态为-1,说明前一个节点等待被唤醒,则返回true if (ws == Node.SIGNAL) return true; if (ws > 0) { // 若前一个节点的状态大于0,则说明前一个节点已经被取消,则需要将其从队列中剔除 do { // 把前面所有处于已取消状态的节点都从同步队列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前一个节点的状态 小于等于 0 ,则将它的状态更改为-1(等待被唤醒) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 该方法在 AbstractQueuedSynchronizer 中 // 真正阻塞线程的方法 private final boolean parkAndCheckInterrupt() { // 阻塞当前线程 LockSupport.park(this); // 返回线程是否中断 return Thread.interrupted(); }unlock源码分析
public void unlock() { // NonfairSync调用父类AbstractQueuedSynchronizer的release(int arg)方法 sync.release(1); } // 该方法在 AbstractQueuedSynchronizer 中 // 释放锁 public final boolean release(int arg) { // tryRelease尝试释放锁,该方法是由子类Sync来实现的 if (tryRelease(arg)) { // 尝试释放锁成功 Node h = head; // 若头结点不为空并且头结点的等待状态不为0,调用unparkSuccessor方法唤醒头结点的下一个节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 该方法在 Sync 中 // 尝试释放锁 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 判断当前线程是否获取锁,若当前线程未获取锁,则抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 当同步状态等于0时,锁才能释放成功,因为锁可重入,所以同步状态的值可能会大于1 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // 该方法在 AbstractQueuedSynchronizer 中 // 该方法用来唤醒头结点的下一个节点 private void unparkSuccessor(Node node) { // 这里的node为同步队列的头结点 int ws = node.waitStatus; // 如果头结点的等待状态小于0,则将它的等待状态设置为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取头结点的下一个节点 Node s = node.next; if (s == null || s.waitStatus > 0) { // 如果头结点的下一个节点为空或者头结点的下一个节点已经被取消, // 则从尾节点向前遍历取到队列最前面的那个状态不是已取消状态的节点 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 如果 s 不为空,则唤醒该节点的线程 if (s != null) LockSupport.unpark(s.thread); }公平锁获取锁,只有同步队列为空时或者同步队列头结点的下一个节点为当前线程时才能尝试加锁,若尝试加锁失败,则将当前线程构造成一个Node节点加入同步队列尾部。这样可以保证等待时间最长的线程能够获取锁,即先到先得。
ReentrantLock lock = new ReentrantLock(true); // 加锁 lock.lock(); // 解锁 lock.unlock();lock源码分析
public void lock() { // FairSync.lock() sync.lock(); } // 该方法在FairSync中 final void lock() { // 调用父类AbstractQueuedSynchronizer的acquire(int arg)方法 acquire(1); } // 该方法在AbstractQueuedSynchronizer中 public final void acquire(int arg) { // AbstractQueuedSynchronizer中的tryAcquire由子类FairSync进行重写 // addWaiter和acquireQueued方法与前面的非公平锁是一样的 // 若尝试获取锁失败,先调用AbstractQueuedSynchronizer.addWaiter方法将新节点存储到同步队列的尾部 // 最后调用AbstractQueuedSynchronizer.acquireQueued方法,该方法不断尝试获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 该方法在FairSync中 // 公平获取锁 protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取同步状态 int c = getState(); if (c == 0) { // 若c = 0,说明此时没有线程获取锁 // 公平锁尝试获取锁之前需要做一个判断,如果没有其它线程在排队,那么当前线程尝试更新state的值为1 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果 c 不为0,说明此时已经有线程获取锁,此时判断获取锁的线程是否是当前线程, else if (current == getExclusiveOwnerThread()) { // 如果获取锁的线程是当前线程,则可以再次获取锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 该方法在AbstractQueuedSynchronizer中 // 如果该方法放回true,则说明有线程比当前线程更早地请求获取锁, // 因此需要等待前驱线程获取并释放锁之后才能继续获取锁 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; // 若 h == t,h和t均为null或是同一个具体的节点,无后继节点,返回false // h != t,若 h.next == null,返回true。什么情况下h!=t的同时h.next==null?,有其他线程第一次正在入队时,可能会出现。见AQS的enq方法,compareAndSetHead(node)完成,还没执行tail = head语句时,此时tail=null,head=newNode,head.next=null。 // h != t,h != null,当头结点的下一个节点等于当前线程时,返回false return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); // 解释为什么要判断:s.thread != Thread.currentThread() // 根据ReentrantLock的解锁流程,可以看到当线程释放锁之后还是会在队列的head节点,只是把next指针指向下一个可用节点,并唤醒它也就是说任意时刻,head节点可能占用着锁(除了第一次执行enq()入队列时,head仅仅是个new Node(),没有实际对应任何线程,但是却“隐式”对应第一个获得锁但并未入队列的线程,和后续的head在含义上保持一致),也可能释放了锁(unlock()),未被阻塞的head.next节点对应的线程在任意时刻都是有必要去尝试获取锁 }条件锁,是指在获取锁之后发现当前业务场景无法处理,而需要等待某个条件的出现才可以继续处理时使用的一种锁。Condition定义了等待、通知两种类型的方法,当线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。
Condition condition = lock.newCondition();
// 该方法在ReentrantLock中 public Condition newCondition() { // 调用Sync的newCondition方法,返回ConditionObject对象 return sync.newCondition(); } // ConditionObject是AbstractQueuedSynchronizer的内部类,主要维护了一条等待队列 // ConditionObject主要有如下属性 // 等待队列的头结点 private transient Node firstWaiter; // 等待队列的尾结点 private transient Node lastWaiter;condition.await();
// 该方法在ConditionObject中 public final void await() throws InterruptedException { // 如果当前线程已经被中断,则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 将当前线程构造成一个节点加入到等待队列的尾部 Node node = addConditionWaiter(); // 当前线程调用AbstractQueuedSynchronizer的fullyRelease方法释放锁 int savedState = fullyRelease(node); int interruptMode = 0; // isOnSyncQueue 用来判断当前线程是否已经进入了同步队列,如果没有,则阻塞 while (!isOnSyncQueue(node)) { // 阻塞自己,等待被唤醒 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 若当前线程已经进入到同步队列中,acquireQueued尝试获取锁。 // 非公平锁已经分析过acquireQueued方法 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除等待队列中取消的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // 该方法在ConditionObject中 // 该方法将当前线程构造成一个节点加入到等待队列的尾部 private Node addConditionWaiter() { // 获取等待队列的尾结点 Node t = lastWaiter; // 新加入到等待队列中的节点,waitStatus = Node.CONDITION // 如果尾结点的waitStatus不等于Node.CONDITION, // 则调用unlinkCancelledWaiters()方法将waitStatus不等于Node.CONDITION的节点移除等待队列 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 构造一个节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; // 如果等待队列为空,则将首节点指向新增节点 else t.nextWaiter = node; // 等待队列不为空,则将尾结点的下一个节点指向新增节点 lastWaiter = node; // 最后将等待队列的尾结点指向新增节点 return node; } // 该方法在AbstractQueuedSynchronizer中 final long fullyRelease(Node node) { boolean failed = true; try { // 获取当前同步状态 long savedState = getState(); // 调用AbstractQueuedSynchronizer的release方法释放锁 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } // 该方法在AbstractQueuedSynchronizer中 public final boolean release(int arg) { // 调用子类Sync实现的tryRelease方法尝试释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 该方法在 Sync 中 // 尝试释放锁 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 判断当前线程是否获取锁,若当前线程未获取锁,则抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 当同步状态等于0时,锁才能释放成功,因为锁可重入,所以同步状态的值可能会大于1 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // 该方法在AbstractQueuedSynchronizer中 final boolean isOnSyncQueue(Node node) { // 如果等待状态是CONDITION,或者prev(这个prev是在同步队列中使用,指向当前节点的前一个节点)为空,则返回false,说明还没有移到AQS的队列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果next(这个next是在同步队列中使用的,指向当前节点的下一个节点)指针有值,说明已经移到AQS的队列中了 if (node.next != null) // If has successor, it must be on queue return true; // 从AQS的尾节点开始往前寻找,看看是否可以找到当前节点,找到了也说明已经在AQS的队列中了 return findNodeFromTail(node); } // 从AQS的尾节点开始往前寻找,看看某个节点是否已经在同步队列中了 private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }condition.signal();将等待队列的首节点转移到AQS同步队列
// 该方法在ConditionObject中 // 将等待队列的首节点移除,将其加入到同步队列的尾部 public final void signal() { // 判断当前线程是否占有着锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取等待队列的首节点 Node first = firstWaiter; if (first != null) doSignal(first); } // 该方法在ConditionObject中 private void doSignal(Node first) { do { // 将同步队列的头结点往后移动一位 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 该方法在AbstractQueuedSynchronizer中 final boolean transferForSignal(Node node) { // 尝试把节点的状态更改为0,也就是转移到AQS队列中 // 如果失败了,说明节点已经被改成取消状态了 // 返回false,通过上面的循环可知会寻找下一个可用节点 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 运行到这里,说明已经成功将node.waitStatus更改为0 // 调用 AbstractQueuedSynchronizer 的enq方法,将node节点转移到AQS队列的尾部 // enq返回的是旧的尾结点 Node p = enq(node); int ws = p.waitStatus; // 如果上一个节点已取消了,或者更新状态为SIGNAL失败(也是说明上一个节点已经取消了) // 则直接唤醒当前节点对应的线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 如果更新上一个节点的等待状态为SIGNAL成功了 // 则返回true,这时上面的循环不成立了,退出循环,也就是只通知了一个节点 // 此时当前节点还是阻塞状态 // 也就是说调用signal()的时候并不会真正唤醒一个节点 // 只是把节点从条件队列移到AQS队列中 return true; }condition.signalAll();将等待队列的所有节点转移到AQS同步队列中
