java并发之阻塞队列

mac2024-01-25  30

阻塞队列

MQ的底层就是由阻塞队列实现,而Array阻塞队列的底层是由一个reentranLock和两个condition实现。 队列是空时,获取元素操作被阻塞。 队列满时,插入元素操作被阻塞。 因此,程序员不需要关心何时阻塞或者唤醒线程。

阻塞队列共有七种,其中常用的是 ArrayBlockingQueue 数组组成的有界阻塞队列 LinkedBlockingQueue 链表组成的有界阻塞队列(Integer.MAX_VALUE) SynchronousQueue 单个元素的阻塞队列(生产一个,消费一个)

核心方法

| 方法\行为 | 抛异常 | 特定的值 | 阻塞 | 超时 | | :——-: | :——-: | :—————: | :—-: | :————————-: | | 插入方法 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) | | 移除方法 | | poll()、remove(o) | take() | poll(timeout, timeunit) | | 检查方法 | element() | peek() | | |

行为解释:

抛异常:如果操作不能马上进行,则抛出异常

特定的值:如果操作不能马上进行,将会返回一个特殊的值,一般是 true 或者 false

阻塞:如果操作不能马上进行,操作会被阻塞

超时:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是 true 或者 false

插入方法:

add(E e):添加成功返回true,失败抛 IllegalStateException 异常 offer(E e):成功返回 true,如果此队列已满,则返回 false put(E e):将元素插入此队列的尾部,如果该队列已满,则一直阻塞 offer:插入队尾,若已满,则等待一定时间后返回false; 删除方法:

remove(Object o) :移除指定元素,成功返回true,失败返回false poll():获取并移除此队列的头元素,若队列为空,则返回 null take():获取并移除此队列头元素,若没有元素则一直阻塞 poll(timeout, timeunit):获取并移除此队列头元素,若没有元素则等待一定时间后返回false; 检查方法:

element() :获取但不移除此队列的头元素,没有元素则抛异常 peek() :获取但不移除此队列的头;若队列为空,则返回 null

SynchronousQueue

public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"put1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName()+"put2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName()+"put3"); blockingQueue.put("3"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } },"a").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take()); TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take()); TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } },"b").start(); }

传统生产者消费者模型

防止虚假唤醒,必须用while而不能用if

//初始为0的变量,两个线程对其交替操作,一个加1,一个减1 class ShareData{ private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment()throws Exception{ lock.lock(); try { while(number != 0) { condition.await(); } number++; System.out.println(Thread.currentThread().getName()+number); condition.signalAll(); } catch (Exception e) { // TODO: handle exception }finally { lock.unlock(); } } public void decrement() throws Exception{ lock.lock(); try { while(number == 0) { condition.await(); } number--; System.out.println(Thread.currentThread().getName()+number); condition.signalAll(); } catch (Exception e) { // TODO: handle exception }finally { lock.unlock(); } } } public class ProdConsunmer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(()->{ for(int i = 0;i<5;i++) { try { shareData.increment(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } },"increment").start(); new Thread(()->{ for(int i = 0;i<5;i++) { try { shareData.decrement(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } },"decrement").start(); } }
最新回复(0)