Semaphore

mac2026-05-22  6

文章目录

简介内部类非公平锁公平锁

简介

Semaphore信号量,用来控制同时访问特定资源的线程数量,他通过协调各个线程,以保证合理地使用公用资源。

内部类

abstract static class Sync extends AbstractQueuedSynchronizer {} // 非公平锁 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } // 尝试获取共享锁 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } // 公平锁 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }

非公平锁

// 默认为非公平锁 Semaphore semaphore = new Semaphore(10); // 获取许可,将同步状态减1 semaphore.acquire(); // Semaphore.acquire() public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // AbstractQueuedSynchronizer.acquireSharedInterruptibly(int arg) public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 若当前线程已经中断,则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // tryAcquireShared尝试获取锁,该方法由AbstractQueuedSynchronizer的子类实现 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); // 若尝试获取许可失败,则需要进行排队 } // Semaphore.NonfairSync.tryAcquireShared(int acquires) // 尝试获取许可 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // Semaphore.Sync.nonfairTryAcquireShared(int acquires) // 非公平锁获取许可 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 这里使用for循环不断地进行CAS操作 // 获取许可数 int available = getState(); // 减去当前线程需要的许可,得到剩余的许可数 int remaining = available - acquires; // 如果剩余许可数小于0,则直接返回,获取许可失败 // 如果剩余许可数不小于0,则尝试原子更新state的值,成功了返回剩余许可数 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(int arg) // 若前面尝试获取许可失败,则加入到同步队列进行排队 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 构造一个节点加入到同步队列中,在 ReentrantLock 章节已经分析过这个方法 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取当前节点的同一个节点 final Node p = node.predecessor(); if (p == head) { // 若当前节点的前一个节点为头结点,则尝试获取许可 int r = tryAcquireShared(arg); if (r >= 0) { // 尝试获取许可成功,则唤醒后面的节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // shouldParkAfterFailedAcquire和parkAndCheckInterrupt的实现方式与ReentrantLock一样 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); // 将节点取消 } } // 释放许可 semaphore.release(); // Semaphore.release() public void release() { sync.releaseShared(1); } // AbstractQueuedSynchronizer.releaseShared(int arg) // 释放许可 public final boolean releaseShared(int arg) { // 尝试释放许可 if (tryReleaseShared(arg)) { doReleaseShared(); // 若释放许可成功,则通知下一个节点 return true; } return false; } // Semaphore.NonfairSync.tryReleaseShared(int releases) // 释放锁,相当于许可加1 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 这里使用for循环不断地进行CAS操作,直到更新同步状态成功为止 if (compareAndSetState(current, next)) return true; } }

公平锁

Semaphore semaphore = new Semaphore(10,true); semaphore.acquire(); // Semaphore.acquire() public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // AbstractQueuedSynchronizer.acquireSharedInterruptibly(int arg) public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 若当前线程已经中断,则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // tryAcquireShared 尝试获取锁,该方法由AbstractQueuedSynchronizer的子类实现 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); // 若尝试获取许可失败,则需要进行排队 } // Semaphore.FairSync.tryAcquireShared(int acquires) protected int tryAcquireShared(int acquires) { for (;;) { // 与非公平锁不同的地方就是这里多了一个判断 // 如果等待队列不为空,并且等待队列的下一个节点不为当前节点,则不能尝试获取锁 // 在 ReentrantLock 章节已经分析过这个方法 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 后面方法的实现与非公平锁一样 semaphore.release();
最新回复(0)