首先看一段源码:
1 public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { 2 private static final long serialVersionUID = -817911632652898426L; 3 final Object[] items; 4 int takeIndex; 5 int putIndex; 6 int count; 7 final ReentrantLock lock; 8 private final Condition notEmpty; 9 private final Condition notFull; 10 transient Itrs itrs = null;ArrayBlockingQueue是一个数组队列,由代码看其维护了一个Object[] items数组,然后同步保证安全;
理解ArrayBlockingQueue主要理解两点即可:FIFO原则和同步安全访问。
①、既然是使用数组实现的队列,那么他如何保证队列的FIFO原则的呢?主要有三个序列控制:
takeIndex(items index for next take, poll, peek or remove),即从队列中获取元素时的控制序列;
putIndex(items index for next put, offer, or add),即往队列中增加元素的控制序列;
count(Number of elements in the queue),即队列中的元素数量序列;
takeIndex每移除一个元素takeIndex则+1(peek方法比较特殊,不会移除元素),当tokeIndex+1的值等于数组items长度时,takeIndex置0。
putIndex每增加一个元素+1,当putIndex+1的值等于数组items的长度时,putIndex置0.
count每增加一个元素+1,count每移除一个元素-1.
由此可见,数组基于这三个变量的循环控制,实现了队列的FIFO。= =、
②、并发安全是基于ReentrantLock和两个Condition实现的。
看一下构造方法:
1 public ArrayBlockingQueue(int capacity, boolean fair) { 2 if (capacity <= 0) 3 throw new IllegalArgumentException(); 4 this.items = new Object[capacity]; 5 lock = new ReentrantLock(fair); 6 notEmpty = lock.newCondition(); 7 notFull = lock.newCondition(); 8 }
使用单参构造时这个fair参数默认是false,即非公平锁。基于ReentrantLock和两个Condition的阻塞和唤醒实现同步;notEmpty在队列中没有元素可获取时阻塞线程,notFull在满队列不可插入时阻塞线程。
还是先看段源码:
1 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 private static final long serialVersionUID = -6903933977591709194L; 4 5 static class Node<E> { 6 E item; 7 Node<E> next; 8 9 Node(E x) { item = x; } 10 } 11 12 private final int capacity; 13 14 private final AtomicInteger count = new AtomicInteger(); 15 16 transient Node<E> head; 17 18 private transient Node<E> last; 19 20 private final ReentrantLock takeLock = new ReentrantLock(); 21 22 private final Condition notEmpty = takeLock.newCondition(); 23 24 private final ReentrantLock putLock = new ReentrantLock(); 25 26 private final Condition notFull = putLock.newCondition();LinkedBlockingQueue和ArrayBlockingQueue有很多相似的地方,主要区别如下:
①LinkedBlockingQueue使用的是单向链表,而ArrayBlockingQueue使用的是数组,故LinkedBlockingQueue使用head节点和last节点维护FIFO原则。
②LinkedBlockingQueue分别使用了takeLock和putLock两个锁进行新增和移除元素的操作,这也导致了元素计数器count属性需要声明为AtomicInteger进行原子操作。
③LinkedBlockingQueue默认可以不指定队列大小,使用Integer.MAX_VALUE默认初始化队列大小。
看源码中属性部分:
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = 5595510919245408276L; private static final int DEFAULT_INITIAL_CAPACITY = 11; private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private transient Object[] queue; private transient int size; private transient Comparator<? super E> comparator; private final ReentrantLock lock; private final Condition notEmpty; private transient volatile int allocationSpinLock; private PriorityQueue<E> q;其仍然是个数组,和ArrayBlockingQueue很像,也是通过单个lock加锁,其特点如下:
①、初始大小11,但是会自动扩容,最大可以到Integer.MAX_VALUE - 8;
②、其队列元素必须实现Comparator接口,以便其基于完全二叉树的最小堆和最大堆排序;
③、PriorityBlockingQueue本身不支持序列化,数组前加了transient修饰,其序列化会转化成PriorityQueue,反序列化时再转换成PriorityBlockingQueue自己。