Java多线程学习(六)Executor 框架

mac2025-07-20  7

本文思维导图

使用线程池的优点

线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

1.**降低资源消耗。**通过重复利用已创建的线程降低线程创建和销毁造成的消耗。2.**提高响应速度。**当任务到达时,任务可以不需要的等到线程创建就能立即执行。3.**提高线程的可管理性。**线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Executor 框架结构(主要由三大部分组成)

1.任务执行任务需要实现的Runnable接口或Callable接口。Runnable接口或Callable接口实现类都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。(Runnable接口不会返回结果但是Callable接口可以返回结果。后面介绍Executors类的一些方法的时候会介绍到两者的相互转换)2.执行任务如下图所示,包括任务执行机制的核心接口Executor ,以及继承自Executor 接口的ExecutorService接口。ScheduledThreadPoolExecutor和ThreadPoolExecutor这两个关键类实现了ExecutorService接口。3.获得结果Future接口以及Future接口的实现类FutureTask类。 当我们把Runnable接口或Callable接口的实现类提交(调用submit方法)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,会返回一个FutureTask对象。

Executor 框架的使用示意图

1.主线程首先要创建实现Runnable或者Callable接口的任务对象。2.然后可以把创建完成的Runnable对象直接交给ExecutorService执行3.如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象4.最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

ThreadPoolExecutor详解

1.ThreadPoolExecutor类中重要的四个属性2.构造方法 /** * 用给定的初始参数创建一个新的ThreadPoolExecutor。 * * @param keepAliveTime 当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交, * 核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime; * @param unit keepAliveTime参数的时间单位 * @param workQueue 等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列; * @param threadFactory 执行者创建新线程时使用的工厂 * @param handler RejectedExecutionHandler类型的变量,表示线程池的饱和策略。 * 如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。 * 线程池提供了4种策略: * 1.AbortPolicy:直接抛出异常,这是默认策略; * 2.CallerRunsPolicy:用调用者所在的线程来执行任务; * 3.DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; * 4.DiscardPolicy:直接丢弃任务; */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 3.如何创建ThreadPoolExecutor1.《阿里巴巴Java开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。 为什么呢? 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。 2.《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险 Executors 返回线程池对象的弊端如下: FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。 CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致OOM。 创建方式一:通过构造方法实现 创建方式二:通过Executor 框架的工具类Executors来实现创建三种类型的ThreadPoolExecutor:1.FixedThreadPool2.SingleThreadExecutor3.CachedThreadPool 4.FixedThreadPool详解FixedThreadPool被称为可重用固定线程数的线程池。通过Executors类中的相关源代码来看一下相关实现: /** * 创建一个可重用固定数量线程的线程池 * 在任何时候至多有n个线程处于活动状态 * 如果在所有线程处于活动状态时提交其他任务,则它们将在队列中等待, * 直到线程可用。 如果任何线程在关闭之前的执行期间由于失败而终止, * 如果需要执行后续任务,则一个新的线程将取代它。池中的线程将一直存在 * 知道调用shutdown方法 * * @param nThreads 线程池中的线程数 * @param threadFactory 创建新线程时使用的factory * @return 新创建的线程池 * @throws NullPointerException 如果threadFactory为null * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } 从上面源代码可以看出新创建的FixedThreadPool的corePoolSize和maximumPoolSize都被设置为nThreads。FixedThreadPool的execute()方法运行示意图:

图片说明:

1.如果当前运行的线程数小于corePoolSize,则创建新的线程来执行任务;2.当前运行的线程数等于corePoolSize后,将任务加入LinkedBlockingQueue;3.线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue中获取任务来执行;

FixedThreadPool使用无界队列 LinkedBlockingQueue(队列的容量为Intger.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:

public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } 1.当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize;2.由于1,使用无界队列时maximumPoolSize将是一个无效参数;3.由于1和2,使用无界队列时keepAliveTime将是一个无效参数;4.运行中的FixedThreadPool(未执行shutdown()或shutdownNow()方法)不会拒绝任务5.SingleThreadExecutor详解SingleThreadExecutor是使用单个worker线程的Executor。下面看看SingleThreadExecutor的实现: /** * 创建使用单个worker线程运行无界队列的Executor * 并使用提供的ThreadFactory在需要时创建新线程 * * @param threadFactory 创建新线程时使用的factory * @return 新创建的单线程Executor * @throws NullPointerException 如果ThreadFactory为空 */ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } 从上面源代码可以看出新创建的SingleThreadExecutor的corePoolSize和maximumPoolSize都被设置为1.其他参数和FixedThreadPool相同。SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Intger.MAX_VALUE)。SingleThreadExecutor使用无界队列作为线程池的工作队列会对线程池带来的影响与FixedThreadPool相同。SingleThreadExecutor的运行示意图

图片说明:

1.如果当前运行的线程数少于corePoolSize,则创建一个新的线程执行任务;2.当前线程池中有一个运行的线程后,将任务加入LinkedBlockingQueue3.线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue中获取任务来执行;6.CachedThreadPool详解CachedThreadPool是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool的实现: /** * 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它, * 并在需要时使用提供的ThreadFactory创建新线程。 * * @param threadFactory 创建新线程使用的factory * @return 新创建的线程池 * @throws NullPointerException 如果threadFactory为空 */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } CachedThreadPool的corePoolSize被设置为空(0),maximumPoolSize被设置为Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新的线程。极端情况下,这样会导致耗尽cpu和内存资源。CachedThreadPool的execute()方法的执行示意图

图片说明:

1.首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤2;2.当初始maximumPool为空,或者maximumPool中没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1将失败,此时CachedThreadPool会创建新线程执行任务,execute方法执行完成;7.ScheduledThreadPoolExecutor详解 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

执行逻辑:

1.首先将 Runnable/Callable 封装为 ScheduledFutureTask,延迟时间作为比较属性;

2.然后加入 **DelayedWorkQueue **队列中,每次取出队首延迟最小的任务,超时等待,然后执行;

3.最后判断是否为周期任务,然后将其重新加入 **DelayedWorkQueue **队列中;

scheduleAtFixedRate和scheduleWithFixedDelay讲解:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleAtFixedRate(() -> { try { sleep(1000); System.out.println("run task" + new Date().toLocaleString()); } catch (InterruptedException e) { e.printStackTrace(); } }, 3, 5, TimeUnit.SECONDS); scheduledExecutorService.scheduleWithFixedDelay(() -> { try { sleep(1000); System.out.println("run task" + new Date().toLocaleString()); } catch (InterruptedException e) { e.printStackTrace(); } }, 3, 5, TimeUnit.SECONDS);

scheduleAtFixedRate,固定频率周期任务,注意一次任务超时,会持续的影响后续的任务周期;

scheduleWithFixedDelay,固定延迟周期任务,即每次任务结束后,超时等待固定时间;

ScheduledThreadPoolExecutor 线程最多为核心线程,最大线程数不起作用,因为 DelayedWorkQueue 是无界队列;

各种线程池的适用场景介绍

FixedThreadPool: 适用于为了满足资源管理需求,而需要限制当前线程数量的应用场景。它适用于负载比较重的服务器;

SingleThreadExecutor: 适用于需要保证顺序地执行各个任务并且在任意时间点,不会有多个线程是活动的应用场景。

CachedThreadPool: 适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器;

ScheduledThreadPoolExecutor: 适用于需要多个后台执行周期任务,同时为了满足资源管理需求而需要限制后台线程的数量的应用场景,

SingleThreadScheduledExecutor: 适用于需要单个后台线程执行周期任务,同时保证顺序地执行各个任务的应用场景。

最新回复(0)