手写线程池核心功能源码

mac2024-10-02  53

文章目录

知识回顾前言阻塞队列与非阻塞队ConcurrentLinkedDequeBlockingQueueArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue 使用BlockingQueue模拟生产者与消费者手写线程池源码

知识回顾

前言

因为线程池的源码涉及到阻塞队列,所以这里前科普下阻塞队列,如果对此已有所了解, 可知直接跳到手写线程池源码章节

阻塞队列与非阻塞队

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞, 或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞, 直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞, 直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列. 1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue, (基于数组的并发阻塞队列) 6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列) 7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列) 8.PriorityBlockingQueue, (带优先级的无界阻塞队列) 9.SynchronousQueue (并发同步阻塞队列)

ConcurrentLinkedDeque

ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,通过无锁的方式,实现 了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它 是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先 加入的,尾是最后加入的,该队列不允许null元素。 ConcurrentLinkedQueue重要方法: add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别) poll() 和peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。

ConcurrentLinkedDeque q = new ConcurrentLinkedDeque(); q.offer("于胜华"); q.offer("百度"); q.offer("课堂"); q.offer("张杰"); q.offer("艾姐"); //从头获取元素,删除该元素 System.out.println(q.poll()); //从头获取元素,不刪除该元素 System.out.println(q.peek()); //获取总长度 System.out.println(q.size());

BlockingQueue

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是: 在队列为空时,获取元素的线程会等待队列变为非空。 当队列满时,存储元素的线程会等待队列可用。

ArrayBlockingQueue

ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。 ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部

<String> arrays = new ArrayBlockingQueue<String>(3); arrays.add("李四"); arrays.add("张军"); arrays.add("张军"); // 添加阻塞队列 arrays.offer("张三", 1, TimeUnit.SECONDS);

LinkedBlockingQueue

LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。他也是以先进先出的方式存储数据

LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); linkedBlockingQueue.add("张三"); linkedBlockingQueue.add("李四"); linkedBlockingQueue.add("李四"); System.out.println(linkedBlockingQueue.size());

PriorityBlockingQueue

PriorityBlockingQueue是一个没有边界的队列,他允许插入null对象。所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就 是按照我们对这个接口的实现来定义的。我们可以从PriorityBlockingQueue获得一个迭代器Iterator,但这个迭代器并不保证按照优先级

SynchronousQueue

SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

使用BlockingQueue模拟生产者与消费者

class ProducerThread implements Runnable { private BlockingQueue<String> blockingQueue; private AtomicInteger count = new AtomicInteger(); private volatile boolean FLAG = true; public ProducerThread(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "生产者开始启动...."); while (FLAG) { String data = count.incrementAndGet() + ""; try { boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (offer) { System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "成功.."); } else { System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "失败.."); } Thread.sleep(1000); } catch (Exception e) { } } System.out.println(Thread.currentThread().getName() + ",生产者线程停止..."); } public void stop() { this.FLAG = false; } } class ConsumerThread implements Runnable { private volatile boolean FLAG = true; private BlockingQueue<String> blockingQueue; public ConsumerThread(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "消费者开始启动...."); while (FLAG) { try { String data = blockingQueue.poll(2, TimeUnit.SECONDS); if (data == null || data == "") { FLAG = false; System.out.println("消费者超过2秒时间未获取到消息."); return; } System.out.println("消费者获取到队列信息成功,data:" + data); } catch (Exception e) { // TODO: handle exception } } } } public class Test0008 { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3); ProducerThread producerThread = new ProducerThread(blockingQueue); ConsumerThread consumerThread = new ConsumerThread(blockingQueue); Thread t1 = new Thread(producerThread); Thread t2 = new Thread(consumerThread); t1.start(); t2.start(); //10秒后 停止线程.. try { Thread.sleep(10*1000); producerThread.stop(); } catch (Exception e) { // TODO: handle exception } } }

手写线程池源码

整体的核心思路如图

package com.concurrent.pool; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class MySelfThreadPool { //默认线程池中的线程的数量 private static final int WORK_NUM =5; //默认处理任务的数量 private static final int TASK_NUM = 100; private int workNum;//线程数量 private int taskNum;//任务数量 private final Set<WorkThread> workThreads;//保存线程的集合 private final BlockingQueue<Runnable> taskQueue;//阻塞有序队列存放任务 public MySelfThreadPool() { this(WORK_NUM, TASK_NUM); } public MySelfThreadPool(int workNum, int taskNum) { //如果输入的变量为小于的0的,则直接使用默认值 if (workNum <= 0){ workNum = WORK_NUM; } if (taskNum <= 0){ taskNum = TASK_NUM; } //创建一个与最大容量相同的阻塞队列 taskQueue = new ArrayBlockingQueue<>(taskNum); this.workNum = workNum; this.taskNum = taskNum; workThreads = new HashSet<>(); //启动一定数量的线程数,从队列中获取任务处理 for (int i=0;i<workNum;i++) { WorkThread workThread = new WorkThread("thead_"+i); workThread.start(); workThreads.add(workThread); } } /** * 线程池执行任务的方法,其实就是往BlockingQueue中添加元素 * @param task */ public void execute(Runnable task) { try { taskQueue.put(task); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void destroy() { System.out.println("ready close thread pool..."); if (workThreads == null || workThreads.isEmpty()) return ; for (WorkThread workThread : workThreads) { workThread.stopWork(); workThread = null;//help gc } workThreads.clear(); } /** * 线程池中的工作线程,直接从BlockingQueue中获取任务 * 然后执行任务而已 * blockQueue为阻塞队列 * */ private class WorkThread extends Thread{ public WorkThread(String name) { setName(name); } @Override public void run() { while (!interrupted()) { try { //take -->【若队列为空,发生阻塞,等待有元素】 Runnable runnable = taskQueue.take();//获取任务 if (runnable !=null) { System.out.println(getName()+" ready execute:"+runnable.toString()); runnable.run();//执行任务 } runnable = null;//help gc } catch (Exception e) { interrupt(); e.printStackTrace(); } } } public void stopWork() { interrupt(); } } public static void main(String[] args) { new MySelfThreadPool().execute(()-> System.out.println("index")); } }
最新回复(0)