定义线程池
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 异步线程池 */ @Configuration @EnableAsync public class AsyncExecutorConfig { /** * Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 8; /** * Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 16; /** * Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 200; private String threadNamePrefix = "AsyncExecutor-"; @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix(threadNamePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } } 代码中我们通过 ThreadPoolTaskExecutor 创建了一个线程池。参数含义如下所示: corePoolSize:线程池创建的核心线程数 maxPoolSize:线程池最大线程池数量,当任务数超过corePoolSize以及缓冲队列也满了以后才会申请的线程数量。 setKeepAliveSeconds: 允许线程空闲时间60秒,当maxPoolSize的线程在空闲时间到达的时候销毁。 ThreadNamePrefix:线程的前缀任务名字。 RejectedExecutionHandler:当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务异步任务服务类
mport lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Component; import java.util.Random; import java.util.concurrent.Future; @Component @Slf4j public class AsyncTask { public static Random random = new Random(); @Async("taskExecutor") public void sendSms() throws InterruptedException { log.info("开始做任务2:发送短信"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); log.info("完成任务1,耗时:" + (end - start) + "毫秒"); } // 返回结果的异步调用 @Async("taskExecutor") public Future<String> pay() throws InterruptedException { log.info("开始做异步返回结果任务2:支付"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); log.info("完成任务2,耗时:" + (end - start) + "毫秒"); return new AsyncResult<>("会员服务完成"); } /** * 会员积分任务 * @throws InterruptedException */ @Async("taskExecutor") public void vip() throws InterruptedException { log.info("开始做任务5:会员"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); log.info("开始做异步返回结果任务5,耗时:" + (end - start) + "毫秒"); } }