condition相对于wait和notify,更加灵活,可以用多个condition实例对一个lock控制,通过condition可以更精细的控制多线程的休眠和唤醒。condition接口提供的方法:
public interface Condition{ //使当前线程进入等待状态直到被通知(signal)或中断,相当于synchronized中的wait()方法 await() throws InterruptException; // 进入等待状态但不响应中断要求 awaitYninterruptibly(); //当前线程进入等待状态,直到被唤醒或被中断或超时 long awaitNanos(long nanosTimeout) throws InterruptedException; //同awaitNanos,但可以指明时间单位 boolean await(long time, TimeUnit unit) throws InterruptedException; //调用该方法当前线程进入等待状态,直到被唤醒、中断或到达某个时 boolean awaitUntil(Date deadline) throws InterruptedException; //唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须 //获取与Condition相关联的锁,功能与notify()相同 void signal(); //唤醒所有等待在Condition上的线程,该线程从等待方法返回前必须 //获取与Condition相关联的锁,功能与notifyAll()相同 void signalAll(); }ConditionObject的具体实现类:
public class ConditionObject implements Condition, java.io.Serializable{ // 第一个等待队列节点 private transient Node firstWaiter; //等待队列最后一个等待结点 private transient Node lastWaiter; ... }每当lock.newCondition(),都会返回一个新的ConditionObject对象。在ConditionObject中,通过一个等待队列来维护等待的线程,所以在一个同步器中可以有多个等待队列,他们等待的条件是不一样的。
等待队列通过firstWaiter和lastWaiter表示队头和队尾,每次加入到等待队列中的线程都会加入到等待队列的尾部,是一个FIFO单向队列(AQS的锁竞争队列是双向的),模型图如下:
等待队列中的节点只有两种状态,CONDITION和CANCELLED,await方法的具体实现有:
public final void await()throws InterruptedException{ if(Thread.interrupted()){ throw new InterruptedException(); } Node node = addConditionWaiter(); // 释放当前线程锁的同步状态 long savedState = fullyRelease(node); int interruptMode = 0; // 节点是否在同步队列中,即是否被唤醒。如果线程不在,也没有被中断唤醒,一直处于park状态 while (!isOnSyncQueue(node)) { // 如果不在同步队列中,挂起当前线程 LockSupport.park(this); // 判断是否被中断唤醒,如果是退出循环。 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }添加到等待队列中addConditionWaiter()
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }唤醒操作signal方法
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }signal方法中处理两个事件:一是判断当前线程是否吃用独占锁,如果不持有,就抛出异常;共享模式下没有等待队列,所以是不会有condition的。二是唤醒等待队列的第一个节点,执行doSignal方法。
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }dosignal方法中也处理了两件事,首先把节点从条件等待队列中移除,然后重新维护firstWaiter和lastWaiter;其次是将移除的节点添加到同步队列中,并且唤醒前驱节点。所以,整个signal工作的流程是:先判断当前线程是否持有独占锁,如果有,唤醒等待队列中的第一个节点,并从等待队列中移除,移动到同步队列中去。如果加入同步队列失败,继续循环等待队列中的其他节点。如果加入同步队列成功,如果前驱节点结束或者设置前驱节点为SIGNAL状态失败,则唤醒当前节点代表的线程。到此signal()任务完成,被唤醒后的线程也将从前面的await()方法中的while循环中退出,因为此时该线程的结点已在同步队列中,进而调用AQS的acquireQueued()方法加入获取同步状态的竞争中,这就是等待唤醒机制的整个流程实现原理。 判断是否在同步队列中的方法:
final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }唤醒所有线程的方式和唤醒单个线程的方式类似,找到队头后一次唤醒:
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }