java线程池优化--ThreadPoolExecutor机制

mac2024-05-13  33

ThreadPoolExecutor机制  一、概述  1、ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务;  2、Executors方法提供的线程服务,都是通过参数设置来实现不同的线程池机制。  3、先来了解其线程池管理的机制,有助于正确使用,避免错误使用导致严重故障。同时可以根据自己的需求实现自己的线程池 

二、核心构造方法讲解  下面是ThreadPoolExecutor最核心的构造方法  Java代码  

public ThreadPoolExecutor(int corePoolSize,                                 int maximumPoolSize,                                 long keepAliveTime,                                 TimeUnit unit,                                 BlockingQueue<Runnable> workQueue,                                 ThreadFactory threadFactory,                                 RejectedExecutionHandler handler) {           if (corePoolSize < 0 ||               maximumPoolSize <= 0 ||               maximumPoolSize < corePoolSize ||               keepAliveTime < 0)               throw new IllegalArgumentException();           if (workQueue == null || threadFactory == null || handler == null)               throw new NullPointerException();           this.corePoolSize = corePoolSize;           this.maximumPoolSize = maximumPoolSize;           this.workQueue = workQueue;           this.keepAliveTime = unit.toNanos(keepAliveTime);           this.threadFactory = threadFactory;           this.handler = handler;       }  

构造方法参数讲解  参数名    作用 corePoolSize    核心线程池大小 maximumPoolSize    最大线程池大小 keepAliveTime    线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间 TimeUnit    keepAliveTime时间单位 workQueue    阻塞任务队列 threadFactory    新建线程工厂 RejectedExecutionHandler    当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

一、ThreadPoolExecutor的重要参数   corePoolSize:核心线程数 核心线程会一直存活,及时没有任务需要执行 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭 queueCapacity:任务队列容量(阻塞队列) 当核心线程数达到最大时,新任务会放在队列中排队等待执行 maxPoolSize:最大线程数 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常 keepAliveTime:线程空闲时间 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize 如果allowCoreThreadTimeout=true,则会直到线程数量=0 allowCoreThreadTimeout:允许核心线程超时 rejectedExecutionHandler:任务拒绝处理器 两种情况会拒绝处理任务: 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常 ThreadPoolExecutor类有几个内部实现类来处理这类情况: AbortPolicy 丢弃任务,抛运行时异常 CallerRunsPolicy 执行任务 DiscardPolicy 忽视,什么都不会发生 DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务 实现RejectedExecutionHandler接口,可自定义处理器   二、ThreadPoolExecutor执行顺序:      线程池按以下行为执行任务  

当线程数小于核心线程数时,创建线程。 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。 当线程数大于等于核心线程数,且任务队列已满 若线程数小于最大线程数,创建线程 若线程数等于最大线程数,抛出异常,拒绝任务   三、如何设置参数   默认值 corePoolSize=1 queueCapacity=Integer.MAX_VALUE maxPoolSize=Integer.MAX_VALUE keepAliveTime=60s allowCoreThreadTimeout=false rejectedExecutionHandler=AbortPolicy() 如何来设置 需要根据几个值来决定 tasks :每秒的任务数,假设为500~1000 taskcost:每个任务花费时间,假设为0.1s responsetime:系统允许容忍的最大响应时间,假设为1s 做几个计算 corePoolSize = 每秒需要多少个线程处理?  threadcount = tasks/(1/taskcost) =tasks*taskcout =  (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可 queueCapacity = (coreSizePool/taskcost)*responsetime 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。 maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost) 计算可得 maxPoolSize = (1000-80)/10 = 92 (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数 rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理 keepAliveTime和allowCoreThreadTimeout采用默认通常能满足 以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件(呵呵)和优化代码,降低taskcost来处理。     参考: http://blog.csdn.net/zhouhl_cn/article/details/7392607 http://my.oschina.net/u/169390/blog/97415

重点讲解:  其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。 

1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。  2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行  3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务  4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理  5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程  6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭 

线程管理机制图示: 

三、Executors提供的线程池配置方案 

1、构造一个固定线程数目的线程池,配置的corePoolSize与maximumPoolSize大小相同,同时使用了一个无界LinkedBlockingQueue存放阻塞任务,因此多余的任务将存在再阻塞队列,不会由RejectedExecutionHandler处理  Java代码  

public static ExecutorService newFixedThreadPool(int nThreads) {           return new ThreadPoolExecutor(nThreads, nThreads,                                         0L, TimeUnit.MILLISECONDS,                                         new LinkedBlockingQueue<Runnable>());       }

  

2、构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁  Java代码  

public static ExecutorService newCachedThreadPool() {           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                         60L, TimeUnit.SECONDS,                                         new SynchronousQueue<Runnable>());       }  

3、构造一个只支持一个线程的线程池,配置corePoolSize=maximumPoolSize=1,无界阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行  Java代码  

public static ExecutorService newSingleThreadExecutor() {           return new FinalizableDelegatedExecutorService               (new ThreadPoolExecutor(1, 1,                                       0L, TimeUnit.MILLISECONDS,                                       new LinkedBlockingQueue<Runnable>()));       }  

4、构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是无界队列,所以这个值是没有意义的  Java代码  

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {           return new ScheduledThreadPoolExecutor(corePoolSize);       }      public static ScheduledExecutorService newScheduledThreadPool(               int corePoolSize, ThreadFactory threadFactory) {           return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);       }      public ScheduledThreadPoolExecutor(int corePoolSize,                                ThreadFactory threadFactory) {           super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,                 new DelayedWorkQueue(), threadFactory);       }  

四、定制属于自己的非阻塞线程池  Java代码  

import java.util.concurrent.ArrayBlockingQueue;   import java.util.concurrent.ExecutorService;   import java.util.concurrent.RejectedExecutionHandler;   import java.util.concurrent.ThreadFactory;   import java.util.concurrent.ThreadPoolExecutor;   import java.util.concurrent.TimeUnit;   import java.util.concurrent.atomic.AtomicInteger;         public class CustomThreadPoolExecutor {                 private ThreadPoolExecutor pool = null;                     /**       * 线程池初始化方法       *        * corePoolSize 核心线程池大小----10       * maximumPoolSize 最大线程池大小----30       * keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit       * TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES       * workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞队列       * threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂       * rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,       *                          即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),       *                                任务会交给RejectedExecutionHandler来处理       */       public void init() {           pool = new ThreadPoolExecutor(                   10,                   30,                   30,                   TimeUnit.MINUTES,                   new ArrayBlockingQueue<Runnable>(10),                   new CustomThreadFactory(),                   new CustomRejectedExecutionHandler());       }                 public void destory() {           if(pool != null) {               pool.shutdownNow();           }       }                     public ExecutorService getCustomThreadPoolExecutor() {           return this.pool;       }              private class CustomThreadFactory implements ThreadFactory {              private AtomicInteger count = new AtomicInteger(0);                      @Override           public Thread newThread(Runnable r) {               Thread t = new Thread(r);               String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);               System.out.println(threadName);               t.setName(threadName);               return t;           }       }                     private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {              @Override           public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {               // 记录异常               // 报警处理等               System.out.println("error.............");           }       }                            // 测试构造的线程池       public static void main(String[] args) {           CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();           // 1.初始化           exec.init();                      ExecutorService pool = exec.getCustomThreadPoolExecutor();           for(int i=1; i<100; i++) {               System.out.println("提交第" + i + "个任务!");               pool.execute(new Runnable() {                   @Override                   public void run() {                       try {                           Thread.sleep(3000);                       } catch (InterruptedException e) {                           e.printStackTrace();                       }                       System.out.println("running=====");                   }               });           }                                            // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了           // exec.destory();                      try {               Thread.sleep(10000);           } catch (InterruptedException e) {               e.printStackTrace();           }       }   }  

方法中建立一个核心线程数为30个,缓冲队列有10个的线程池。每个线程任务,执行时会先睡眠3秒,保证提交10任务时,线程数目被占用完,再提交30任务时,阻塞队列被占用完,,这样提交第41个任务是,会交给CustomRejectedExecutionHandler 异常处理类来处理。 

提交任务的代码如下:  Java代码  

public void execute(Runnable command) {           if (command == null)               throw new NullPointerException();           /*           * Proceed in 3 steps:           *           * 1. If fewer than corePoolSize threads are running, try to           * start a new thread with the given command as its first           * task.  The call to addWorker atomically checks runState and           * workerCount, and so prevents false alarms that would add           * threads when it shouldn't, by returning false.           *           * 2. If a task can be successfully queued, then we still need           * to double-check whether we should have added a thread           * (because existing ones died since last checking) or that           * the pool shut down since entry into this method. So we           * recheck state and if necessary roll back the enqueuing if           * stopped, or start a new thread if there are none.           *           * 3. If we cannot queue task, then we try to add a new           * thread.  If it fails, we know we are shut down or saturated           * and so reject the task.           */           int c = ctl.get();           if (workerCountOf(c) < corePoolSize) {               if (addWorker(command, true))                   return;               c = ctl.get();           }           if (isRunning(c) && workQueue.offer(command)) {               int recheck = ctl.get();               if (! isRunning(recheck) && remove(command))                   reject(command);               else if (workerCountOf(recheck) == 0)                   addWorker(null, false);           }           else if (!addWorker(command, false))               reject(command);       }  

注意:41以后提交的任务就不能正常处理了,因为,execute中提交到任务队列是用的offer方法,如上面代码,这个方法是非阻塞的,所以就会交给CustomRejectedExecutionHandler 来处理,所以对于大数据量的任务来说,这种线程池,如果不设置队列长度会OOM,设置队列长度,会有任务得不到处理,接下来我们构建一个阻塞的自定义线程池 五、定制属于自己的阻塞线程池  Java代码  

package com.tongbanjie.trade.test.commons;      import java.util.concurrent.ArrayBlockingQueue;   import java.util.concurrent.ExecutorService;   import java.util.concurrent.RejectedExecutionHandler;   import java.util.concurrent.ThreadFactory;   import java.util.concurrent.ThreadPoolExecutor;   import java.util.concurrent.TimeUnit;   import java.util.concurrent.atomic.AtomicInteger;      public class CustomThreadPoolExecutor {                           private ThreadPoolExecutor pool = null;                           /**        * 线程池初始化方法        *         * corePoolSize 核心线程池大小----1        * maximumPoolSize 最大线程池大小----3        * keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit        * TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES        * workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(5)====5容量的阻塞队列        * threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂        * rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,        *                          即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),        *                                任务会交给RejectedExecutionHandler来处理        */         public void init() {             pool = new ThreadPoolExecutor(                     1,                     3,                     30,                     TimeUnit.MINUTES,                     new ArrayBlockingQueue<Runnable>(5),                     new CustomThreadFactory(),                     new CustomRejectedExecutionHandler());         }                       public void destory() {             if(pool != null) {                 pool.shutdownNow();             }         }                           public ExecutorService getCustomThreadPoolExecutor() {             return this.pool;         }                  private class CustomThreadFactory implements ThreadFactory {                  private AtomicInteger count = new AtomicInteger(0);                          @Override             public Thread newThread(Runnable r) {                 Thread t = new Thread(r);                 String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);                 System.out.println(threadName);                 t.setName(threadName);                 return t;             }         }                           private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {                  @Override             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                 try {                                   // 核心改造点,由blockingqueue的offer改成put阻塞方法                   executor.getQueue().put(r);               } catch (InterruptedException e) {                   e.printStackTrace();               }           }         }                                    // 测试构造的线程池         public static void main(String[] args) {                        CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();             // 1.初始化             exec.init();                          ExecutorService pool = exec.getCustomThreadPoolExecutor();             for(int i=1; i<100; i++) {                 System.out.println("提交第" + i + "个任务!");                 pool.execute(new Runnable() {                     @Override                     public void run() {                         try {                             System.out.println(">>>task is running=====");                            TimeUnit.SECONDS.sleep(10);                       } catch (InterruptedException e) {                             e.printStackTrace();                         }                     }                 });             }                                       // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了             // exec.destory();                          try {                 Thread.sleep(10000);             } catch (InterruptedException e) {                 e.printStackTrace();             }         }     }    

解释:当提交任务被拒绝时,进入拒绝机制,我们实现拒绝方法,把任务重新用阻塞提交方法put提交,实现阻塞提交任务功能,防止队列过大,OOM,提交被拒绝方法在下面 

    Java代码  

public void execute(Runnable command) {           if (command == null)               throw new NullPointerException();              int c = ctl.get();           if (workerCountOf(c) < corePoolSize) {               if (addWorker(command, true))                   return;               c = ctl.get();           }           if (isRunning(c) && workQueue.offer(command)) {               int recheck = ctl.get();               if (! isRunning(recheck) && remove(command))                   reject(command);               else if (workerCountOf(recheck) == 0)                   addWorker(null, false);           }           else if (!addWorker(command, false))               // 进入拒绝机制, 我们把runnable任务拿出来,重新用阻塞操作put,来实现提交阻塞功能               reject(command);       }  

总结:  1、用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止OOM  2、如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务  3、最大线程数一般设为2N+1最好,N是CPU核数  4、核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数  5、如果要获取任务执行结果,用CompletionService,但是注意,获取任务的结果的要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大OOM,所以最好异步开个线程获取结果

最新回复(0)