本节讲解的所有队列都实现了BlockingQueue接口,内部方法介绍:
boolean offer(E e):插入元素,不阻塞.
void put(E e):插入元素,阻塞.
E poll():获取并移除头部元素,不阻塞.
E take():获取并移除头部元素,阻塞.
E peek():获取链表的首部元素(只读取而不移除)
boolean remove(Object o):删除队列中指定的元素,有则删除,没有返回false.
Java并发编程之ConcurrentLinkedQueue详解
ConcurrentLinkedQueue是线程安全的误解非阻塞队列,启迪成数据结构使用单项链表实现,对于入队和出兑操作使用CAS来实现线程安全.
内部使用了两个Volatile类型的Node节点分别用来存放队列的首尾节点.Node节点内部则维护一个使用volatile修饰的变量tiem来存放节点的值.
在队列末尾添加一个元素,不能传入null(抛出NPE异常).内部使用CAS无阻塞算法,不会阻塞挂起线程.
public boolean offer(E e) { // 检查如果传入空数据,抛出异常 checkNotNull(e); final Node<E> newNode = new Node<E>(e); // 自旋式的从尾节点插入 // 1、根据tail节点定位出尾节点(last node);2、将新节点置为尾节点的下一个节点;3、casTail更新尾节点 for (Node<E> t = tail, p = t;;) { // p用来表示队列的尾节点,初始情况下等于tail节点 // q是p的next节点 Node<E> q = p.next; if (q == null) { // p是尾节点 // 设置p节点的下一个节点为新节点,设置成功则casNext返回true // 否则返回false,说明有其他线程更新过尾节点 if (p.casNext(null, newNode)) { // 如果p != t,则将入队节点设置成tail节点,更新失败了也没关系 // 因为失败了表示有其他线程成功更新了tail节点 // 这里使队列每添加两次,尾节点更新一次 if (p != t) casTail(t, newNode); return true; } } // 执行了poll后可能会出现头节点自引用的情况 // 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点 else if (p == q) // 先取得t的值,在执行t = tail,并取得新的t的值,然后比较这两个值是否相等。 // 这种情况表示在比较的过程中,tail被其他线程修改了,这时,我们就用新的tail为链表的尾 // 但如果tail没有被修改,则返回head,要求从头部开始,重新查找链表末尾。 p = (t != (t = tail)) ? t : head; else // 判断尾节点是否被改变,如果没有将p向后移动 p = (p != t && t != (t = tail)) ? t : q; } }[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kAIVSW6r-1572597900093)(/home/benjamin/.config/Typora/typora-user-images/1571489081343.png)]
总而言之,当添加一个节点时会出现两种状态,p节点是尾节点,这种情况下可以插入.p节点不是尾节点(被其他线程修改),这种情况下需要走最后一个else分支将p指针向后移动.
另外,poll操作可能将头节点自引用,那么需要将p指向新的head然后重新寻找尾节点.
在队列头部获取并移除一个元素,如果队列为空返回null.
public E poll() { // 这个是goto标记 restartFromHead:// (1) for (;;) {// (2) for (Node<E> h = head, p = h, q;;) { // p节点表示首节点,即需要出队的节点 E item = p.item;// (3) // 不是空队列,且CAS操作成功,将头结点后一个节点的元素置空 if (item != null && p.casItem(item, null)) {// (4) // 之前q被移动过,将p设置为头节点 if (p != h)// (5) // 这一步将头结点自引用了,目的是为了下一步走向(7) updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。 // 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了 else if ((q = p.next) == null) {// (6) // 这种情况下多是其他线程将队列中的元素取光了,那么重新设置头结点,返回null updateHead(h, p); return null; } // 运行到这里说明有其他线程添加了尾节点,使该队列不为空队列 else if (p == q)// (7) // 重新执行该方法 continue restartFromHead; // 将p向后移动, else// (8) p = q; } } } // 设置头结点,并将原来的头结点自引用,提醒其他线程更新头结点 final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) // 将旧的头结点h的next域指向为h h.lazySetNext(h); } 总结一下,当没有其他线程打扰,方法将一步走到(5),然后重新设置头节点,并退出方法.如果其他线程这时将队列中的元素取光了,那么运行到(6).如果碰巧有其他线程添加了尾节点,那么运行到(7)或者(8),一般先运行(8),将p向后移动一个节点,下一次循环中走到(5)之后会将重新设置头结点,并将原h节点(尾节点)自引用,这样的情况下其他线程的代码会走向(7),重新执行该方法.之前提到的offer方法中也有这种情况的相对策略.
并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。
计算当前队列元素个数,统计元素是不准确的
public int size() { int count = 0; // first()获取第一个具有非空元素的节点,若不存在,返回null // succ(p)方法获取p的后继节点,若p == p的后继节点,则返回head for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out // 最大返回Integer.MAX_VALUE if (++count == Integer.MAX_VALUE) break; return count; } // 获取队列中的第一个有效节点 Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } } // 获取传入节点的后继节点,如果该节点自引用,返回真正的头结点 final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; } 如果队列中存在该元素则删除该元素,存在多个则删除第一个.
public boolean remove(Object o) { if (o != null) { // 删除为空直接返回false Node<E> next, pred = null; for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; // 节点元素不为null if (item != null) { // 匹配不上让p和pred向后移动 if (!o.equals(item)) { next = succ(p); continue; } // 匹配上将该元素置空 removed = p.casItem(item, null); } // 获取删除节点的后继节点 next = succ(p); // 将被删除的节点移除队列 if (pred != null && next != null) // unlink pred.casNext(p, next); if (removed) return true; } } return false; } 判断队列中是否有制定对象,结果并不精确,但不牵扯方法内的多线程影响.
public boolean contains(Object o) { if (o == null) return false; // 遍历队列 for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; // 若找到匹配节点,则返回true if (item != null && o.equals(item)) return true; } return false; } ConcurrentLinkedQueue底层使用单向链表数据结构来保存队列元素,使用非阻塞CAS算法,没有加锁.因为head和tail两个节点都是由volatile修饰的,本身可以保证可见性,所以只要保证对这两个变量操作的原子性即可.
offer操作是在tail后添加元素,实际上是调用CASNext方法,只有一个线程能成功,其他线程需要重新寻找尾节点.(队列新增两次,尾节点更新一次)
poll操作一样.
LinkedBlockingQueue是使用独占锁实现的阻塞队列.
有单向链表实现,有两个Node,分别用来存放首尾节点,还有一个原子变量Count用来记录队列元素个数.
还有两个ReentrantLock实例,分别用来空值元素入队和出队的原子性.
tackLock控制出队操作,putLock控制入队操作.
另外使用了两个条件变量,notEmpty(由tackLock锁获得,在出队是判断队列是否为空)和notFull(由putLock锁获得,在入队是判断队列是否已满).
如果有空闲,插入元素并返回true.没有则返回false.
如果传入元素为null,则抛出异常
public boolean offer(E e) { // 传入元素为null抛出异常 if (e == null) throw new NullPointerException(); // 队列判满 final AtomicInteger count = this.count; if (count.get() == capacity)// capacity默认为MAX_VALUE,可在构造中传参设置 return false; // 构造新节点,获取入队锁对象 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; // 加锁 putLock.lock(); try { // 如果队列不满,进队,并递增元素数 if (count.get() < capacity) { // 将node节点链接到队列尾 enqueue(node); // count自增1,并返回修改前的值 c = count.getAndIncrement(); // 如果添加后还有空间,唤醒之前条件阻塞的入队线程 if (c + 1 < capacity) notFull.signal(); } } finally { // 常规解锁操作 putLock.unlock(); } if (c == 0) // c为入队前队列中的元素数,c==0说明此时队列中至少有一个元素 // 唤醒其他所有因为不能出队条件阻塞的线程 signalNotEmpty(); return c >= 0; } 向队列尾插入一个元素,如果队列有空闲则插入,队列已满就阻塞当前线程,直到队列有空闲插入成功后返回.
当被阻塞是其他线程设置了中断,抛出InterruptedExecption异常.
如果传入元素为null,抛出空指针异常.
public void put(E e) throws InterruptedException { // 判断传入元素是不是null if (e == null) throw new NullPointerException(); // 构建新节点,获取入队锁对象和计数器 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 可响应中断式的加锁 putLock.lockInterruptibly(); try { // 队列已满,则使当前入队线程条件阻塞,等待出队线程的条件唤醒 while (count.get() == capacity) { notFull.await(); } // 设置尾节点 enqueue(node); // 修改计数器,并返回计数前的值 c = count.getAndIncrement(); // 判断入队后是否满,如果不满,唤醒其他的入队线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 如果至少有一个元素(入队前为空,入队后至少有一个元素),唤醒其他的出队线程 if (c == 0) signalNotEmpty(); } 从队列头获取一个并移除一个元素,如果队列为空返回null.该方法并不等待其他线程入队元素.
public E poll() { // 队列为空返回null final AtomicInteger count = this.count; if (count.get() == 0) return null; { "action": "visitor", "vip_name": "李", "vip_num": 2, "face_url": "http://benjaminlee.cn:8989/hello/images/20181115213421.jpg", "frame_url": "http://benjaminlee.cn:8989/hello/images/20181115213421.jpg", "timestamp": 1564730279999 } // 获取出队锁对象 E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { // 队列不空,则出队并递减计数器 x = dequeue(); c = count.getAndDecrement(); // 出队后队列不为空,则唤醒其他出队线程 if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } // 出队前队列满,则出队后队列有空隙,唤醒其他入队线程 if (c == capacity) signalNotFull(); return x; } 获取队列的头部元素,并从队列中移除,如果队列为空,阻塞当前线程,直到队列不为空后返回元素.
该方法响应中断,会抛出异常.
public E take() throws InterruptedException { // 获取出队锁对象和计数器,并响应中断式的进行加锁 E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 如果队列为空,循环条件挂起 notEmpty.await(); } // 出队并递减计数器 x = dequeue(); c = count.getAndDecrement(); if (c > 1) // 如果出队后还有元素,唤醒其他出队线程 notEmpty.signal(); } finally { takeLock.unlock(); } // 如果出队前队列已满,那么出队后出现了空位,唤醒其他入队线程 if (c == capacity) signalNotFull(); return x; } 获取头部节点元素,但不移除.
加出队锁的目的是防止获取了元素,但其他线程在该方法返回前将它出队了,造成不一致,和空指针异常
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } } 删除队列中指定的元素,有则删除,没有返回false.
public boolean remove(Object o) { // 删除null元素直接返回false if (o == null) return false; // 同时加入队锁和出队锁 fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { // 找到目标节点,删除节点 if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { // 解入队锁和出队锁 fullyUnlock(); } } // 删除trail节点后的p节点 void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next; // 如果p是尾节点,那么重新设置尾节点 if (last == p) last = trail; // 如果当前队列满,删除元素后队列不满,唤醒入队线程 if (count.getAndDecrement() == capacity) notFull.signal(); }LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:
队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 ArrayBlockQueue是使用有界数组方式实现的阻塞队列.
ArrayBlockQueue内部有一个数组items,用来存放队列元素.outIndex用来存放入队元素下标,tackIndex用来存放出队元素下标.这些变量并没有使用volatile修饰,因为加锁已经保证了这些变量在锁内的可见性了.
独占锁lock用来保证出入队操作的原子性,notEmpty,notFull连个条件变量用来进行出入队的同步.
ArrayBlockQueue是有界的,所以构造必须传入队列大小为参数.默认情况下使用非公平锁.
插入元素,如果队列已满,则丢弃元素,不会阻塞线程.
public boolean offer(E e) { // 判断元素是否为空,为空抛出异常 checkNotNull(e); // 加锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 判断队列是否满,count为队列中已填充元素数量 // items.length为数组长度,也就是队列的最大值 if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } // 方法内部将在putIndex位置上放置新元素,并将putIndex++,如果越界重置为0(循环数组) private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } 向尾部插入一个元素,如果有空闲,插入,如果队列已满,阻塞等待队列出现空闲.
public void put(E e) throws InterruptedException { // 判断传入元素的非空 checkNotNull(e); // 加锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 循环判断队列是否已满,如果已满,条件挂起 while (count == items.length) notFull.await(); // 走到这里说明队列中出现了空闲,插入元素 enqueue(e); } finally { // 常规解锁 lock.unlock(); } } 从队列头部移除一个元素,如果队列为空,返回null,不会阻塞等待队列不为空
public E poll() { // 加锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 如果队列为空,返回null // 如果不为空,进行删除元素的操作,并将该元素返回 return (count == 0) ? null : dequeue(); } finally { // 常规解锁 lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") // 获取头部的元素,并将队列中的元素置空 E x = (E) items[takeIndex]; items[takeIndex] = null; // 循环队列 if (++takeIndex == items.length) takeIndex = 0; // 调整计数器的值 count--; // itrs是当前活动迭代器的共享状态;如果已知没有状态,则为null。 if (itrs != null) // 更新迭代器中的元素数据 itrs.elementDequeued(); // 唤醒因为队列满导致没有入队成功的入队线程 notFull.signal(); return x; } 获取当前队列头部元素,并删除它,如果队列为空,会阻塞等待队列不为空时进行操作
可响应中断.
public E take() throws InterruptedException { // 获取锁对象,并响应中断式的加锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 循环判断队列是否为空,为空则条件阻塞 while (count == 0) notEmpty.await(); // 删除并返回头部元素 return dequeue(); } finally { lock.unlock(); } } 获取头部元素,但是不移除,如果队列为空,返回null
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { // 直接返回出队下标处的元素 return itemAt(takeIndex); } finally { lock.unlock(); } } // 获取对应下表处的元素 final E itemAt(int i) { return (E) items[i]; } ArrayBlockQueue使用一个独占锁来实现只能有一个线程进行入队和出队操作,这个锁的粒度比较大,类似于在方法上加synchronized.
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素,内部使用平衡二叉树堆实现,所以直接遍历队列元素不保证有序.在构造函数需传入comparator,用于插入元素时继续排序,若没有传入comparator,则插入的元素必须实现Comparatable接口.
PriorityBlockingQueue内部有一个数组queue,用来存放队列元素,size存放元素个数.allocationSpinLock是一个自旋锁,其使用CAS操作保证只有一个线程可以对队列进行操作,状态为0或1.
没有notFull条件变量是因为这个队列是无界的,入队操作是非阻塞的.
在队列中插入一个元素,因为是无界队列,所以一定会返回true.
public boolean offer(E e) { // 对入队元素进行非空判断 if (e == null) throw new NullPointerException(); // 加锁 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 如果当前元素个数>=队列容量,则扩容 // 分解 array = queue // cap = array.length // n = size // n >= cap (所有实际上是判断是否满队列,如果满队列,循环进行扩容,扩容失败的线程会自旋在此) while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { // 比较器,如果有传入比较器的话使用自定义的比较器,如果没有使用默认的 Comparator<? super E> cmp = comparator; if (cmp == null) // n是原队列第一个空位,e是入队元素,array是队列 siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); // 队列元素数+1 size = n + 1; // 解锁所有因为空队列挂起的条件阻塞 notEmpty.signal(); } finally { lock.unlock(); } return true; } // 扩容操作 private void tryGrow(Object[] array, int oldCap) { // 释放锁 // 使用CAS控制只能有一个线程成功扩容,释放锁让其他线程进行入队出队操作,降低并发性 lock.unlock(); Object[] newArray = null; // 这也是一个锁,只让一个线程进行扩容 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 如果oldCap小于64,扩容为2倍+2,如果大于,扩容50% int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); // 按照之前算法扩容后的容量如果溢出,则最小扩容量为原容量+1 if (newCap - MAX_ARRAY_SIZE > 0) { int minCap = oldCap + 1; // 如果最小扩容量溢出或者小于0,那么扩容失败 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); // 扩容为极限大小 newCap = MAX_ARRAY_SIZE; } // 如果正常扩容情况下没有溢出,创建一个新数组,大小为扩容后的数组 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { // 解锁 allocationSpinLock = 0; } } // 第一个线程获取锁之后第二个线程会直接来到这里,让出CPU资源给第一个线程 if (newArray == null) Thread.yield(); // 加锁,判断并拷贝数组 lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } } // 建立堆 private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { // 找到k的父节点,如果k小于父节点的值,将父节点置换为k // 直到k大于等于父节点的值,这样就构造了一个极小堆(所有父节点小于子节点) int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } 获取内部根节点的元素,如果队列为空,返回null
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } // 获取内部根节点的元素,如果队列为空,返回null private E dequeue() { // 判断队列是否为空 int n = size - 1; if (n < 0) return null; else { // 将第n个元素取出为x // 第0个元素取出为result Object[] array = queue; E result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } } // k为空闲位置,x为尾元素,array为堆,n为堆大小 // 一直用小的孩子向上弥补父节点,直到最后一层,用最后一个节点补上 private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1;// 无符号右移 while (k < half) { // 子节点默认为左孩子 int child = (k << 1) + 1; Object c = array[child]; // 右孩子 int right = child + 1; // 如果 (右孩子在堆内 && 左孩子大于右孩子) 那么右孩子代替左孩子作为孩子节点,并且c为孩子的值 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; // 当尾元素小于孩子时,退出 if (key.compareTo((T) c) <= 0) break; // 用孩子节点向上替补空闲的父节点 array[k] = c; k = child; } array[k] = key; } } 获取根节点元素,如果队列为空阻塞
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { // 循环获取根节点元素,如果队列为空,挂起 // 循环防止多线程同时被挂起 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } 内部使用二叉树堆维护元素优先级,使用可扩容的数组作为元素存储的数据结构.出队时保证出队元素是根节点,并重置整个堆为极小堆.
内部使用了一个独占锁来控制并发.只使用了一个条件变量notEmpty而没有使用notFull是因为这个队列是无界的,不存在满队列情况.
DelayQueue是一个无界阻塞延迟队列,队列中的每个元素都有过期时间,只有过期元素才会出列,队列头元素是最快要过期的元素.
内部使用PriorityQueue(二叉堆实现的队列)存放数据,使用ReentrantLock实现线程同步.要知道每个元素的过期时间,所有入队的元素要实现Delayed接口.内部使用优先级队列,所以还要实现元素之间相互比较的元素.
条件变量available与lock锁是对应的,目的是实现线程间同步.
正在操作堆顶元素的take()方法线程会被标记为leader.
内部使用PriorityQueue存放时间,使用ReentrantLock实现线程的同步