因为线程池的源码涉及到阻塞队列,所以这里前科普下阻塞队列,如果对此已有所了解, 可知直接跳到手写线程池源码章节
ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,通过无锁的方式,实现 了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它 是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先 加入的,尾是最后加入的,该队列不允许null元素。 ConcurrentLinkedQueue重要方法: add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别) poll() 和peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque(); q.offer("于胜华"); q.offer("百度"); q.offer("课堂"); q.offer("张杰"); q.offer("艾姐"); //从头获取元素,删除该元素 System.out.println(q.poll()); //从头获取元素,不刪除该元素 System.out.println(q.peek()); //获取总长度 System.out.println(q.size());阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是: 在队列为空时,获取元素的线程会等待队列变为非空。 当队列满时,存储元素的线程会等待队列可用。
ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。 ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部
<String> arrays = new ArrayBlockingQueue<String>(3); arrays.add("李四"); arrays.add("张军"); arrays.add("张军"); // 添加阻塞队列 arrays.offer("张三", 1, TimeUnit.SECONDS);LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。他也是以先进先出的方式存储数据
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); linkedBlockingQueue.add("张三"); linkedBlockingQueue.add("李四"); linkedBlockingQueue.add("李四"); System.out.println(linkedBlockingQueue.size());PriorityBlockingQueue是一个没有边界的队列,他允许插入null对象。所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就 是按照我们对这个接口的实现来定义的。我们可以从PriorityBlockingQueue获得一个迭代器Iterator,但这个迭代器并不保证按照优先级
SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。
整体的核心思路如图
package com.concurrent.pool; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class MySelfThreadPool { //默认线程池中的线程的数量 private static final int WORK_NUM =5; //默认处理任务的数量 private static final int TASK_NUM = 100; private int workNum;//线程数量 private int taskNum;//任务数量 private final Set<WorkThread> workThreads;//保存线程的集合 private final BlockingQueue<Runnable> taskQueue;//阻塞有序队列存放任务 public MySelfThreadPool() { this(WORK_NUM, TASK_NUM); } public MySelfThreadPool(int workNum, int taskNum) { //如果输入的变量为小于的0的,则直接使用默认值 if (workNum <= 0){ workNum = WORK_NUM; } if (taskNum <= 0){ taskNum = TASK_NUM; } //创建一个与最大容量相同的阻塞队列 taskQueue = new ArrayBlockingQueue<>(taskNum); this.workNum = workNum; this.taskNum = taskNum; workThreads = new HashSet<>(); //启动一定数量的线程数,从队列中获取任务处理 for (int i=0;i<workNum;i++) { WorkThread workThread = new WorkThread("thead_"+i); workThread.start(); workThreads.add(workThread); } } /** * 线程池执行任务的方法,其实就是往BlockingQueue中添加元素 * @param task */ public void execute(Runnable task) { try { taskQueue.put(task); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void destroy() { System.out.println("ready close thread pool..."); if (workThreads == null || workThreads.isEmpty()) return ; for (WorkThread workThread : workThreads) { workThread.stopWork(); workThread = null;//help gc } workThreads.clear(); } /** * 线程池中的工作线程,直接从BlockingQueue中获取任务 * 然后执行任务而已 * blockQueue为阻塞队列 * */ private class WorkThread extends Thread{ public WorkThread(String name) { setName(name); } @Override public void run() { while (!interrupted()) { try { //take -->【若队列为空,发生阻塞,等待有元素】 Runnable runnable = taskQueue.take();//获取任务 if (runnable !=null) { System.out.println(getName()+" ready execute:"+runnable.toString()); runnable.run();//执行任务 } runnable = null;//help gc } catch (Exception e) { interrupt(); e.printStackTrace(); } } } public void stopWork() { interrupt(); } } public static void main(String[] args) { new MySelfThreadPool().execute(()-> System.out.println("index")); } }