如果开发中用到多线程,基本上就使用java的线程池,主要原因:
1、避免线程的重复创建
2、限流(当超过指定的阈值时会有一些拒绝策略)
public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //超时时间,超出核心线程数意外的线程的存活时间 TimeUnit unit, //存活时间的单位 BlockingQueue<Runnable> workQueue, //阻塞队列 ThreadFactory threadFactory, //创建线程的工厂 RejectedExecutionHandler handler) { //拒绝策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
newFixedThreadPool () 创建一个固定的线程池
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), //可能OOM threadFactory); }newSingleThreadExcutor 创建只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, //只有一个线程的线程池 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), //可能OOM threadFactory)); }newCacheThreadPool 上限是Integer.maxvalue();不限制最大线程数
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, //可能OOM 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),//不存数据的队列 threadFactory); }这个三个线程池均是构建了一个ThreadPoolExcutor,差异是最大线程数,核心线程数 和队列不同
newScheduledThreadPool 定时器,延时执行的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }ThreadPoolExecutor 中的excutor方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }execute执行流程
AbortPolicy 直接抛异常 拿到异常后可以做处理,例如把记录日志,把任务交给另外的队列或者消息中间件去处理
CallerRunsPolicy 用者所在线程去执行任务
DiscardOldestPolicy 丢弃阻塞队列中最靠前的任务,并执行当前任务
DiscardPolicy 直接丢掉
CPU密集型(CPU-bound)
CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间。 cpu 密集型(CPU-bound)线程池设计 最佳线程数=cpu核数或者cpu核数±1
IO密集型(I/O bound)
I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。比如接收一个前端请求--解析参数--查询数据库--返回给前端这样的,那么就是IO密集型的,例如web应用。
I/O密集型(I/O-bound)线程池设计 最佳线程数 = ((线程等待时间+线程cpu时间)/线程cpu时间*cpu数目)
任务执行时间
任务执行很长,要设置线程数设置多一些,任务执行很短,线程数设置少一些
使用线程池时,必须了解线程池的状态,当出现问题时有助于我们去解决问题。可以继承ThreadPoolExecutor,重写beforeExecutor、 afterExecutor、shutdown等方法进行线程池的监控。
public class MyExecutors extends ThreadPoolExecutor { //beforeExecutor、 afterExecutor、shutdown private ConcurrentMap<String, Date> startTime; public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.startTime = new ConcurrentHashMap<>(); } @Override public void shutdown() { System.out.println("已经执行的任务数量" + this.getCompletedTaskCount() + "\n"); System.out.println("当前的活动线程数" + this.getActiveCount() + "\n"); System.out.println("当前排队的线程数" + this.getQueue() + "\n"); super.shutdown(); } @Override protected void beforeExecute(Thread t, Runnable r) { startTime.put(String.valueOf(r.hashCode()),new Date()); super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { //此处 只要用一次 用完就删 所以使用 remove 不然那个HashMap 就炸掉了 Date startDate = startTime.remove(String.valueOf(r.hashCode())); Date finishDate = new Date(); long dif = finishDate.getTime() - startDate.getTime(); System.out.println("任务耗时:" + dif); System.out.println("最大允许的线程数:" + this.getMaximumPoolSize()); System.out.println("线程的空闲时间:" + this.getKeepAliveTime(TimeUnit.MILLISECONDS)); System.out.println("任务总数:" + this.getTaskCount()); super.afterExecute(r, t); } public static ExecutorService newMyExcutors(){ return new MyExecutors(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } }平时开发中,大家更多的关注的是线程池的创建、任务的提交和执行。往往会忽略线程池的关闭,甚至忘记调用shutdown()方法,导致内存溢出。大多知道需要调用shutdown()关闭线程池,也少研究其真正的关闭过程。
线程池自动关闭的两个条件:1、线程池的引用不可达;2、线程池中没有线程;
这里对于条件2解释一下,线程池中没有线程是指线程池中的所有线程都已运行完自动消亡。然而我们常用的FixedThreadPool的核心线程没有超时策略,所以并不会自动关闭。
展示两种不同线程池 不关闭 的情况:
1、FixedThreadPool 示例
public static void main(String[] args) { while(true) { ExecutorService executorService = Executors.newFixedThreadPool(8); //死循环不停创建线程池 executorService.execute(() -> System.out.println("running")); executorService = null; } }输出结果:
running ...... running Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357) at test.PoolTest.main(PoolTest.java:29)因为FixedThreadPool的核心线程不会自动超时关闭,使用时必须在适当的时候调用shutdown()方法。
2、 CachedThreadPool 示例
public static void main(String[] args) { while(true) { // 默认keepAliveTime为 60s ExecutorService executorService = Executors.newCachedThreadPool(); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService; // 为了更好的模拟,动态修改为1纳秒 threadPoolExecutor.setKeepAliveTime(1, TimeUnit.NANOSECONDS); threadPoolExecutor.execute(() -> System.out.println("running")); } }输出结果:
running running running running running ......CachedThreadPool 的线程 keepAliveTime 默认为 60s ,核心线程数量为 0 ,所以不会有核心线程存活阻止线程池自动关闭。 详见 线程池之ThreadPoolExecutor构造 ,为了更快的模拟,构造后将 keepAliveTime 修改为1纳秒,相当于线程执行完马上会消亡,所以线程池可以被回收。实际开发中,如果CachedThreadPool 确实忘记关闭,在一定时间后是可以被回收的。但仍然建议显示关闭。
然而,线程池关闭的意义不仅仅在于结束线程执行,避免内存溢出,因为大多使用的场景并非上述示例那样 朝生夕死。线程池一般是持续工作的全局场景,如数据库连接池。
是当线程池调用shutdown()方法后,会经历些什么?思考一下几个问题:
是否可以继续接受新任务?等待队列里的任务是否还会执行?正在执行的任务是否会立即中断?问题1:是否可以继续接受新任务?继续提交新任务会怎样?
public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); executor.execute(() -> System.out.println("before shutdown")); executor.shutdown(); executor.execute(() -> System.out.println("after shutdown")); }输出结果如下:
before shutdown Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task PoolTest$$Lambda$2/142257191@3e3abc88 rejected from java.util.concurrent.ThreadPoolExecutor@6ce253f1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at PoolTest.main(PoolTest.java:12)当线程池关闭后,继续提交新任务会抛出异常。这句话也不够准确,不一定是抛出异常,而是执行拒绝策略,默认的拒绝策略是抛出异常。可参见 线程池之ThreadPoolExecutor构造 里面自定义线程池的例子,自定义了忽略策略,但被拒绝时并没有抛出异常。
问题2:等待队列里的任务是否还会执行?
public class WaitqueueTest { public static void main(String[] args) { BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); for(int i = 1; i <= 100 ; i++){ workQueue.add(new Task(String.valueOf(i))); } ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, workQueue); executor.execute(new Task("0")); executor.shutdown(); System.out.println("workQueue size = " + workQueue.size() + " after shutdown"); } static class Task implements Runnable{ String name; public Task(String name) { this.name = name; } @Override public void run() { for(int i = 1; i <= 10; i++){ System.out.println("task " + name + " is running"); } System.out.println("task " + name + " is over"); } } }这个demo解释一下,我们用LinkedBlockingQueue构造了一个线程池,在线程池启动前,我们先将工作队列填充100个任务,然后执行task 0 后立即shutdown()线程池,来验证线程池关闭队列的任务运行状态。
输出结果如下:
...... task 0 is running task 0 is over workQueue size = 100 after shutdown //表示线程池关闭后,队列任然有100个任务 task 1 is running ...... task 100 is running task 100 is over从结果中我们可以看到,线程池虽然关闭,但是队列中的任务任然继续执行,所以用 shutdown()方式关闭线程池时需要考虑是否是你想要的效果。
如果你希望线程池中的等待队列中的任务不继续执行,可以使用shutdownNow()方法,将上述代码进行调整,如下:
public class WaitqueueTest { public static void main(String[] args) { BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); for(int i = 1; i <= 100 ; i++){ workQueue.add(new Task(String.valueOf(i))); } ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, workQueue); executor.execute(new Task("0")); // shutdownNow有返回值,返回被抛弃的任务list List<Runnable> dropList = executor.shutdownNow(); System.out.println("workQueue size = " + workQueue.size() + " after shutdown"); System.out.println("dropList size = " + dropList.size()); } static class Task implements Runnable{ String name; public Task(String name) { this.name = name; } @Override public void run() { for(int i = 1; i <= 10; i++){ System.out.println("task " + name + " is running"); } System.out.println("task " + name + " is over"); } } }输出结果如下:
task 0 is running workQueue size = 0 after shutdown task 0 is running task 0 is running task 0 is running task 0 is running task 0 is running task 0 is running task 0 is running task 0 is running task 0 is running dropList size = 100 task 0 is over从上述输出可以看到,只有任务0执行完毕,其他任务都被drop掉了,dropList的size为100。通过dropList我们可以对未处理的任务进行进一步的处理,如log记录,转发等;
问题3:正在执行的任务是否会立即中断?
需要对线程的 interrupt 方法有一定了解。 正在执行的任务不会被中断,线程会被打上interrupted标记,线程会将当前任务执行完毕,并且不会接收新的的任务。