ReentrantReadWriteLock

mac2026-05-28  2

文章目录

简介内部类读写状态的设计写锁的获取与释放读锁的获取与释放

简介

读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量。

内部类

abstract static class Sync extends AbstractQueuedSynchronizer {} public static class ReadLock implements Lock, java.io.Serializable{} public static class WriteLock implements Lock, java.io.Serializable {}

读写状态的设计

读写锁同样依赖同步器来实现来实现同步功能,读写状态就是同步器的状态。在ReentrantLock中,同步状态表示锁被一个线程重复获取的次数,而读写锁需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。如果需要在一个整形变量上维护多个状态,就需要将这个整形变量按位切割,读写锁将这个变量切割成两部分,高16位代表读,低16位代表写。

例如下面的状态表示:一个线程获取了写锁,并且重复获取了两次写锁,同时也获取了两次读锁。

0000000000000010 0000000000000011

假设当前同步状态为X,读状态等于X>>>16,写状态等于X&0x0000FFFF。当写状态加1时,同步状态等于X+1,当读状态加1时,同步状态等于X+(1<<16)。

写锁的获取与释放

写锁是一个支持可重入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁的时候,读锁已经被获取或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

// 默认使用非公平锁 ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); // 写锁的获取 writeLock.lock(); // ReentrantReadWriteLock.WriteLock.lock() public void lock() { sync.acquire(1); // 获取锁 } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { // tryAcquire由ReentrantReadWriteLock的内部类Sync实现 // 先尝试获取锁 // 如果获取锁失败,则会进入同步队列中排队,后面的逻辑跟ReentrantLock是一样的 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // ReentrantReadWriteLock.Sync.tryAcquire(int acquires) // 尝试获取写锁 protected final boolean tryAcquire(int acquires) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取同步状态 int c = getState(); // 互斥锁被获取的次数,w = c & ((1 << 16) - 1) int w = exclusiveCount(c); if (c != 0) { // 如果 c != 0,w == 0,说明共享锁被获取的次数不为0,互斥锁被获取次数为0,则不能获取锁 // 如果 c != 0,w != 0,说明共享锁被获取的次数不为0,互斥锁被获取次数不为0,若获取写锁的线程不是当前线程,则不能获取锁 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 溢出检测 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可 setState(c + acquires); return true; } // 非公平锁的 writerShouldBlock 直接返回false, // 如果是公平锁,writerShouldBlock使用hasQueuedPredecessors()来进行判断 // 然后使用CAS操作更新同步状态 // 如果失败了,说明获取写锁失败,返回false // 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回true if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } // ReentrantReadWriteLock.NonfairSync.writerShouldBlock() final boolean writerShouldBlock() { return false; } // 写锁的释放 writeLock.unlock(); // 释放锁 public void unlock() { // ReentrantReadWriteLock.Sync sync.release(1); } // AbstractQueuedSynchronizer.release(int arg) public final boolean release(int arg) { // tryRelease尝试释放锁 // tryRelease由ReentrantReadWriteLock的内部类Sync实现 // 若释放锁成功,后面的逻辑跟ReentrantLock是一样的 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // ReentrantReadWriteLock.Sync.tryRelease(int releases) protected final boolean tryRelease(int releases) { // 若当锁不是被当前线程所占有,则抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 将同步状态减releases(releases此时为1) int nextc = getState() - releases; // 判断写锁是否完全被释放 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); // 设置同步状态变量 setState(nextc); return free; }

写锁的获取,子类只需要实现尝试获取锁的方法;

写锁的释放,子类只需要实现释放锁的方法;

读锁的获取与释放

读锁是一个支持可重入的共享锁,它能够被多个线程同时获取,在没有其它写线程访问时,读锁总会被成功获取,而所做的也只是增加读状态。如果当前线程已经获取了读锁,则增加读状态;如果当前线程在获取读锁时,写锁已经被其它线程获取,则进入等待状态。读状态时是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能保存在ThreadLocal中,由线程自身来维护,这使得获取读锁的实现变得复杂。

// 默认使用非公平锁 ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); // 获取写锁 readLock.lock(); // ReentrantReadWriteLock.ReadLock.lock() public void lock() { // ReentrantReadWriteLock.Sync sync.acquireShared(1); // 获取锁 } // AbstractQueuedSynchronizer.acquireShared(int arg) public final void acquireShared(int arg) { // tryAcquireShared尝试获取锁,由子类ReentrantReadWriteLock.Sync实现 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); // 尝试获取锁失败,则可能需要排队 } // ReentrantReadWriteLock.Sync.tryAcquireShared(int unused) // 尝试获取读锁 protected final int tryAcquireShared(int unused) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取同步状态 // 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数 int c = getState(); // 如果写锁被获取的次数不为0,并且获取写锁的线程不是当前线程,则返回 -1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 能运行到这里,说明此时还没有写锁,或者此时已经有写锁,但是是当前线程所获取 // 读锁被获取的次数 int r = sharedCount(c); // 非公平锁的readerShouldBlock用来判断是否存在写锁, // 公平锁的readerShouldBlock调用hasQueuedPredecessors()来进行判断 // 若此时不存在写锁,则尝试更新同步状态 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 此时获取锁成功 if (r == 0) { // 若之前没有线程获取读锁,记录第一个获取读锁的线程为当前线程 firstReader = current; // 第一个获取读锁的线程,获取读锁的次数 firstReaderHoldCount = 1; } else if (firstReader == current) { // 如果有线程已经获取了读锁,并且当前线程是第一次获取读锁的线程,则将重入次数加1 firstReaderHoldCount++; } else { // 如果有线程已经获取了读锁,但是当前线程不是第一次获取读锁的线程 // 则从缓存中获取重入次数保存器 HoldCounter rh = cachedHoldCounter; // 如果缓存不存在当前线程 // 再从ThreadLocal中获取 // readHolds本身是一个ThreadLocal,里面存储的是HoldCounter if (rh == null || rh.tid != getThreadId(current)) // 如果readHolds不存在当前线程,则会初始化 cachedHoldCounter = rh = readHolds.get(); // 如果它的可冲入次数为0,则将它存储到readHolds中 else if (rh.count == 0) readHolds.set(rh); // 重入的次数加1 rh.count++; } return 1; } // 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败) return fullTryAcquireShared(current); } // ReentrantReadWriteLock.NonfairSync.readerShouldBlock() final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } // AbstractQueuedSynchronizer.apparentlyFirstQueuedIsExclusive() final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; // 如果头节点不为空,并头节点的下一个节点不为空,并且不是共享模式【独占模式,写锁】、并且头节点的下一个节点的线程不为空,则返回true。 // 这个方法判断队列的head.next是否存在正在等待独占锁(写锁)。当然这个方法执行的过程中队列的形态可能发生变化。这个方法的意思是:读锁不应该让写锁始终等待。 } // AbstractQueuedSynchronizer.doAcquireShared(int arg) private void doAcquireShared(int arg) { // 构造一个共享节点,进入AQS的队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取当前节点的前一个节点 final Node p = node.predecessor(); if (p == head) { // 尝试获取读锁 int r = tryAcquireShared(arg); if (r >= 0) { // 获取锁成功 // 头节点后移并传播 // 传播即唤醒后面连续的读节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 没获取到读锁,阻塞并等待被唤醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // AbstractQueuedSynchronizer.setHeadAndPropagate() private void setHeadAndPropagate(Node node, int propagate) { // h为旧的头节点 Node h = head; // 设置当前节点为新头节点 setHead(node); // 如果当前头结点的状态是<0,意味着它是SINGAL和Propagate,那么也去唤醒后继结点。如果头结点为空(可能在其它线程中被释放了),那么也唤醒后继结点来获取资源。 // 如果旧的头节点或新的头节点为空或者其等待状态小于0(表示状态为SIGNAL/PROPAGATE) if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 需要传播 // 取下一个节点 Node s = node.next; // 如果下一个节点为空,或者是需要获取读锁的节点 if (s == null || s.isShared()) // 唤醒下一个节点 doReleaseShared(); } } // AbstractQueuedSynchronizer.doReleaseShared() // 这个方法只会唤醒一个节点 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果头节点状态为SIGNAL,说明要唤醒下一个节点 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 去唤醒下一个节点 unparkSuccessor(h); } else if (ws == 0 && // 把头节点的状态改为PROPAGATE成功才会跳到下面的if !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果唤醒后head没变,则跳出循环 if (h == head) // loop if head changed break; } } //在自旋的阶段,每一次循环的过程都是首先获得头结点,如果头结点不为空且不为尾结点(阻塞队列里面只有一个结点),那么先获得该节点的状态,如果是SIGNAL的状态,则代表它需要有后继结点去唤醒,首先将其的状态变为0,因为是要释放资源了,它也不需要做什么了,所以转变为初始状态,然后去唤醒后继结点unparkSuccessor(h),如果结点状态一开始就是0,那么就给他转换成PROPAGATE状态,保证在后续获取资源的时候,还能够向后面传播 // 释放读锁 readLock.unlock(); // ReentrantReadWriteLock.ReadLock.unlock() public void unlock() { // ReentrantReadWriteLock.Sync sync.releaseShared(1); } // AbstractQueuedSynchronizer.releaseShared() // 释放共享锁 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 若尝试释放读锁成功,doReleaseShared用来唤醒下一个节点 doReleaseShared(); return true; } return false; } // ReentrantReadWriteLock.Sync.tryReleaseShared(int unused) // 尝试释放共享锁 protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 如果第一个获取读锁的线程是当前线程 // 就把它重入的次数减1 // 如果减到0了,就把第一个获取读线程的变量firstReader置为空 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 如果当前线程不是第一个获取读锁的线程,则将它重入次数减1 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // 获取同步状态 int c = getState(); // SHARED_UNIT = (1 << 16) int nextc = c - SHARED_UNIT; // 读锁获取次数减1 if (compareAndSetState(c, nextc)) // 这里需要注意的是,只有读锁全部释放了才会返回true return nextc == 0; } } // AbstractQueuedSynchronizer.doReleaseShared() // 这个方法只会唤醒一个节点 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果头节点状态为SIGNAL,说明要唤醒下一个节点 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒下一个节点 unparkSuccessor(h); } else if (ws == 0 && // 把头节点的状态改为PROPAGATE成功才会跳到下面的if !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果唤醒后head没变,则跳出循环 if (h == head) // loop if head changed break; } }
最新回复(0)