Java并发-----第8章 Java并发包中线程池ThreadPoolExecutor原理剖析

mac2025-09-24  23

第8章 Java并发包中线程池ThreadPoolExecutor原理剖析


文章目录

第8章 Java并发包中线程池ThreadPoolExecutor原理剖析@[toc]1. 介绍2. 结构组成(1). 线程池状态(2). 线程转换(3). 线程池参数(4). 线程池类型(5). 其他 3. 源码分析(1). public void execute(Runnable command)(2). 工作线程Worker的执行(3). shutdown操作(4). shutdownNow操作(5). awaitTermination操作 4. 总结

JDK1.7中的ThreadPoolExecutor源码剖析

1. 介绍

线程池主要解决两个问题:

当执行大量异步任务是线程池能提供良好的性能,如果不使用线程池,每当需要执行异步任务时直接new一个线程来运行,需要很大的开销.线程池提供了一种资源限制和管理的手段,比如限制线程的个数,动态新增线程等.

2. 结构组成

​ Executors实际上是一个工具类,里面提供了很多静态方法,这些方法根据用户选择返回不同的线程池实例.

​ 成员变量ctl是一个Integer的原子变量,用来记录线程池状态和线程池中线程的个数(高3位用来表示线程池状态,低29位用来表示线程个数).

(1). 线程池状态

RUNNING(111):接受新任务并处理阻塞队列里的任务SHUTDOWN(000):拒绝新任务,但是处理阻塞队列中的任务STOP(001):拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务TIDYING(010):所有任务都执行完之后当前线程池活动线程数为0,将调用terminated()方法TERMINATED(011):终止状态,调用terminated()方法并完成后的状态.

(2). 线程转换

RUNNING —> SHUTDOWN:调用shutdown()方法RUNNING或SHUTDOWN —> STOP:调用shutdownNow()方法SHUTDOWN —> TIDYING:线程池和任务队列都为空TIDYING —> TERMINATED:当terminated()hook方法执行完成时

(3). 线程池参数

corePoolSize:线程池核心线程个数workQueue:等待执行的任务的阻塞队列maximunPoolSize:最大线程数量ThreadFactory:创建线程的工厂keeyAliveTime:存活时间

(4). 线程池类型

newFixedThreadPoll:带参数构造,参数为核心线程数和最大线程数,阻塞队列长度为Integer.MAX_VALUEnewSingleThreadExecutor:核心线程数和最大线程数都为1,但是阻塞队列长度为Integer.MAX_VALUEnewCachedThreadPoll:按需创建线程,初始个数为0,最多为Integer.MAX_VALUE,根据线程存活时间进行回收,特殊点在于加入同步队列的任务马上就会被执行,同步队列最多只能有一个元素.

(5). 其他

mainLock是独占锁,用来空值Worker线程操作的原子性termination是该锁对应的条件队列Worker继承AQS和Runnable接口,是具体承载任务的对象.继承了AQS,内部实现了简单的不可重入锁,其中state=0表示锁空闲,state=1表示锁被占用.state=-1是创建时的默认状态,为了避免才运行runWorker()方法时被中断.

3. 源码分析

(1). public void execute(Runnable command)

​ 提交任务command带线程池进行执行.

public void execute(Runnable command) { // 如果传入的任务为空,抛出异常 if (command == null) throw new NullPointerException(); // 获取线程池的状态和线程数量 int c = ctl.get(); // 线程池未满,则添加新线程并更新.addWorker()方法在后面讲解. if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果线程处于RUNNING状态,将任务添加到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { // 检查任务添加到阻塞队列后的线程池状态 int recheck = ctl.get(); // 如果这时线程池状态改变了,删除刚刚添加的任务,并执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果线程池状态没有改变,但是为空,添加一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果线程不是RUNNING状态或者添加任务失败,都说明阻塞队列已满 // 尝试开启一个新线程,如果失败就执行拒绝策略 else if (!addWorker(command, false)) reject(command); } // core如果为true,请使用corePoolSize(核心线程数)作为绑定,否则使用maximumPoolSize(最大线程数)。 // 代码很长,其实就做了两件事。 // 1)才用循环CAS操作来将线程数加1; // 2)新建一个线程并启用。 private boolean addWorker(Runnable firstTask, boolean core) { retry: // 第一部分 循环CAS操作,将线程池中的线程数+1. for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 检查队列的状态是否是非RUNNING状态 // 是SHUTDOWN状态时 传入的任务不为空 或 工作队列为空 // 如果符合以上描述,返回false // 因为SHUTDOWN状态下不接受新任务,且当工作队列为空时说明阻塞队列也为空了,这是会转换成TIDYING状态 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 自旋增加线程个数 for (;;) { int wc = workerCountOf(c); // 线程个数超限返回false // CAPACITY最大线程个数,极限值,非用户定义 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS增加线程个数,只能有一个线程成功,成功的线程进入下一块内容 if (compareAndIncrementWorkerCount(c)) break retry; // 没成功的线程查看线程池状态是否发生变化 // 如果发生变化,调到外层循环重新获取线程状态 // 如果没有变化,内层自旋 c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } // 第二部分 新建线程,并加入到线程池workers中。 // 能运行到这里说明CAS操作成功了, boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 封装任务为Worker对象,并抽取出线程 w = new Worker(firstTask); final Thread t = w.thread; // 上一步成功执行if块内的代码 // 否则返回false if (t != null) { // 上锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 重新获取线程池状态 int rs = runStateOf(ctl.get()); // 如果当前线程是RUNNABLE状态 或者 是SHUTDOWN状态但传入的任务为空 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 判断添加的任务状态,如果已经开始丢出异常(外部手动启动线程) if (t.isAlive()) throw new IllegalThreadStateException(); // 将新建的线程加入到线程池中 workers.add(w); int s = workers.size(); // 修正最大池深度的值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 解锁 mainLock.unlock(); } // 添加任务成功就启动任务 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

(2). 工作线程Worker的执行

​ 用户提交任务到线程池之后,是由Worker来执行的.

​ Worker相当于一个自己带锁的线程.

// Worker的构造方法 Worker(Runnable firstTask) { // 现将state设置为-1是防止在运行前被中断(shutdownNow方法中断所有线程). setState(-1); this.firstTask = firstTask; // 由工厂生产线程 this.thread = getThreadFactory().newThread(this); } // 执行任务 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 将state设置为0,允许中断 w.unlock(); boolean completedAbruptly = true; try { // 当前Worker的调度任务不为空 或者 能获取到任务 while (task != null || (task = getTask()) != null) { w.lock(); // 线程池处于STOP状态 或者 当线程被中断时处于STOP状态 // 但是现在并没有被中断 // 处于以上情景时,发出中断请求 // 总结就是处理中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // 执行扩展接口代码 try { // 开始执行任务前的Hook(钩子) // 在本类中为空实现,可以在子类中加入诸如事务控制之类的动作 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); // 发现异常就向上一层抛出 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 同上 afterExecute(task, thrown); } } finally { // 每次运行完任务,将任务标记为null,并计数 task = null; w.completedTasks++; // 解锁 w.unlock(); } } completedAbruptly = false; } finally { // completedAbruptly为true,说明之前抛出异常了,会进行清理工作 processWorkerExit(w, completedAbruptly); } } // 该方法源源不断的(从等待队列中)向Worker输入任务 private Runnable getTask() { boolean timedOut = false; for (;;) { // 获取线程池状态 int c = ctl.get(); int rs = runStateOf(c); // 如果线程池状态为SHUTDOWN时队列为空 或者 为STOP或TERMINATE状态 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 回收线程,线程数量-1 decrementWorkerCount(); return null; } // 重新得到线程数 int wc = workerCountOf(c); // 标识,含义为当前线程空闲,应该回收 // allowCoreThreadTimeOut含义: // 该值为true,则线程池数量最后销毁到0个。 // 该值为false,超过核心线程数时,而且(超过最大值或者timeout过),就会销毁。 // 当线程数超过核心线程池大小 或者 销毁机制为销毁到0(可能跟STOP状态有关) 时,回收该线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果线程数目大于最大线程数目,或者允许超时回收或者超时,则跳出循环,继续去阻塞队列中取任务 // (如果线程数超过线程池大小 || (标识为应该回收&&还有work可以获取)) && (有线程||work队列非空) if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 注:poll不阻塞,take阻塞 // 当标记为清除时,尝试获取一个Worker,否则阻塞获取一个Worker Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // r!=null 说明标记为不清除,而且得到了work if (r != null) return r; // r==null 说明没有获得work 队列为空了 没有work可以获取了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } // 将w清理出worker队列(线程池) // Worker w:要执行退出的Worker对象 // boolean completedAbruptly:是否用户异常退出,true为异常退出。 private void processWorkerExit(Worker w, boolean completedAbruptly) { // 是否是意外退出,如果是,将WorkerCount-- if (completedAbruptly) decrementWorkerCount(); // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 给总完成任务数计数,然后将worke移除线程 completedTaskCount += w.completedTasks; workers.remove(w); } finally { // 解锁 mainLock.unlock(); } // 调用tryTemiate,进行判断当前的线程池是否处于SHUTDOWN状态,如果是终止线程 tryTerminate(); // 这一整块的含义是判断当前线程数是否小于核心线程个数,如果小于则添加新的线程. int c = ctl.get(); // 判断当前的线程池状态,如果当前线程池状态比STOP大的话,就不处理 if (runStateLessThan(c, STOP)) { // 判断是否是意外退出,如果不是意外退出的话,那么就会判断最少要保留的核心线程数 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果最少保留的Worker数为0的话,那么就会判断当前的任务队列是否为空,如果任务队列不为空的话而且线程池没有停止,那么说明至少还需要1个线程继续将任务完成。 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果当前运行的Worker数比当前所需要的Worker数少的话,那么就会调用addWorker,添加新的Worker if (workerCountOf(c) >= min) return; } addWorker(null, false); } }

(3). shutdown操作

​ 调用shutdown方法后,线程池就不会再接受新任务了,但是继续执行之前添加到任务队列中的任务.

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查当前调用shutdown命令的线程是否有关闭线程的权限 checkShutdownAccess(); // 设置线程状态为SHUTDOWN advanceRunState(SHUTDOWN); // 设置中断 interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } // 判断当前的线程池是否处于SHUTDOWN状态,如果是终止线程 tryTerminate(); } // 多个线程一起设置时,一个线程设置成功退出,其他线程自旋一次,在下一次判断时状态已经被修改,也退出 private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } // 中断空闲的线程 private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历每一个工作线程 for (Worker w : workers) { Thread t = w.thread; // 如果工作线程没有被中断,且没有正在运行,那么补充一个中断标志 // 正在获取任务的线程没有锁,tryLock会返回false if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } // 判断当前的线程池是否处于SHUTDOWN状态,如果是终止线程 final void tryTerminate() { // 自旋执行 for (;;) { // 首先获取线程状态 int c = ctl.get(); // 如果是RUNNING和TIDYING状态 或者 是SHUTDOWN状态但队列不为空 // 说明当前不应该被终止 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()) ) return; // 如果线程数不为0,中断每一个线程(关闭所有空闲线程) if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 使用CAS设置状态为SHUTDOWN(000) if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 钩子方法 terminated(); } finally { //钩子方法执行完毕将线程池状态设置为TERMINATED,释放所有条件等待的线程 ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }

(4). shutdownNow操作

​ 调用shutdownNow()方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列中的任务,正在执行的任务会被中断,该方法会立刻返回,返回值为这时候队列中被对其的任务列表.

public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 权限检查 checkShutdownAccess(); // 修改线程池状态为STOP advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 将中断的任务队列移动到tasks中,稍后返回 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } private List<Runnable> drainQueue() { // 获取工作队列 BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); // 移动队列内节点 q.drainTo(taskList); // 如果有剩余节点, if (!q.isEmpty()) { // 手动一个一个移动 for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }

(5). awaitTermination操作

​ 调用该方法之后,当前线程会被阻塞,知道线程池转台变为TERMINATED或者等待超时才返回.(等同于终止这个线程直到线程池被终止)

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { // 包装参数,加锁 long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 自旋 for (;;) { // 是TERMINATED状态,则不需要进行任何操作了 if (runStateAtLeast(ctl.get(), TERMINATED)) return true; // 如果传入时间为无效时间,返回false if (nanos <= 0) return false; // 如果时间有效,条件阻塞nanos微秒,唤醒后重新判断当前线程池状态 nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }

4. 总结

Java并发包中的线程池工作原理如下:

​ 类中有一个任务队列,用于存放当前线程都被占用,但是仍然要进行的任务.线程被Worker执行,并且自旋式的不断从任务队列获取新的任务,直到任务队列为空,并且当前Worker空闲,就会被移出线程池.

​ 调用shutdown()方法后,会停止接收任务,并中断空闲的线程(当前正在获取任务)

​ 调用shutdownNow()方法后,停止接收任务,并中断所有线程.将任务队列返回.

​ 调用awaitTermination()方法后,将当前线程挂起直至超时或者线程池被销毁.

最新回复(0)