Java集合(19)——并发集合(7)——阻塞队列(0)——阻塞队列的架构和方法剖析

mac2022-06-30  28

目录

1.什么是阻塞队列

2.为什么要使用阻塞队列,有哪些好处?

3.阻塞队列的架构和种类

4.BlockingQueue的核心方法

(1)抛出异常

(2)特殊值

(3)阻塞

(4)超时


1.什么是阻塞队列

阻塞队列,顾名思义,首先它是一个队列 ,而一个阻塞队列在数据结构中所起的作用大致如图:

线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素当阻塞队列为空时,从队列中获取元素的操作将会被阻塞试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程从队列中移除一个或者多个元素或者完全清空队列后使队列重新变得空闲起来并后续新增

2.为什么要使用阻塞队列,有哪些好处?

阻塞:在多线程领域,阻塞就是在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒

好处: 我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,而这一切都由阻塞队列(BlockingQueue)来负责 在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度

3.阻塞队列的架构和种类

架构

种类(7种常用)

ArrayBlockingQueue:由数组结构组成的有界阻塞队列LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE,可以当作无界)阻塞队列PriorityBlockingQueue:支持优先级排序的无界阻塞队列DelayQueue:使用优先级队列实现的延迟无界阻塞队列SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列LinkedTransferQueue:由链表结构组成的无界阻塞队列LinkedBlockingDeque:由链表结构组成的双向阻塞队列

4.BlockingQueue的核心方法

方法类型抛出异常特殊值阻塞超时插入add(e)offer(e)put(e)offer(e,timeout,TimeUnit)移除remove()poll()take()poll(timeout,TimeUnit)返回队首元素element()peek()  

 

 

 

 

 

(1)抛出异常

对于add,当阻塞队列满时,再往队列里add插入元素会抛出IllegalStateException:Queue full对于remove,当阻塞队列空时,再往队列里remove移除元素会抛出NoSuchElementException对于element,当阻塞队列有值的情况下,会返回队首元素,当阻塞队列为空时,抛出NoSuchElementException

add:

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo01 { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("a")); } }

remove:

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo01 { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("a")); System.out.println("=================="); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); } }

element:

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo01 { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); System.out.println(blockingQueue.element()); System.out.println("=================="); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.element()); } }

(2)特殊值

对于offer,插入成功返回true,失败返回false对于poll,成功返回移出队列的元素,队列里没有元素就返回null对于peek,当阻塞队列有值的情况下,会返回队首元素,当阻塞队列为空时,返回null import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo02 { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println("队列满时添加offer返回值:"+blockingQueue.offer("d")); System.out.println(blockingQueue.peek()); System.out.println("=================="); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println("队列为空时移除poll返回值:"+blockingQueue.poll()); System.out.println("队列为空时peek返回值:"+blockingQueue.peek()); } }

(3)阻塞

对于put,当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程直到put数据或响应中断退出对于take,当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用

put:

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo03 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); System.out.println("======阻塞队列满了========"); blockingQueue.put("d"); System.out.println("执行了"); } }

take:

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo03 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); System.out.println("======阻塞队列满了========"); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println("======阻塞队列空了========"); System.out.println(blockingQueue.take()); System.out.println("执行了"); } }

(4)超时

对于offer,当阻塞队列满时,队列会阻塞生产者线程一定时间,超过时间退出对于poll,当阻塞队列空时,队列会阻塞消费者线程一定时间,超过时间退出 import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class BlockingQueueDemo03 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("b",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("c",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("d",2L, TimeUnit.SECONDS)); } }

2秒钟后:

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class BlockingQueueDemo03 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1); System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L,TimeUnit.SECONDS)); } }

2秒钟后:

最新回复(0)