线程池 ThreadPoolExecutor 类的源码解析

mac2022-06-30  19

线程池 ThreadPoolExecutor 类的源码解析:

1:数据结构的分析:

private final BlockingQueue<Runnable> workQueue;  //用于存储未被线程池处理的任务

private final ReentrantLock mainLock = new ReentrantLock(); //维护一个lock来保证线程安全

private final HashSet<Worker> workers = new HashSet<Worker>();

private final Condition termination = mainLock.newCondition(); //通过Condition来进行线程之间的通信

private volatile ThreadFactory threadFactory;  //维护一个线程工厂,用于生成线程

private volatile RejectedExecutionHandler handler; //拒绝任务的句柄对象

private volatile int corePoolSize;  //核心池大小

private volatile int maximumPoolSize;  //最大池的大小

2:构造方法:将用户自定义的参数赋值给成员变量,并且使用默认的线程工厂,默认的任务拒绝

public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue) {

        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

             Executors.defaultThreadFactory(), defaultHandler);

    }

3:下面看看  execute 执行方法的原理:

public void execute(Runnable command) {

        if (command == null)   //判空操作

            throw new NullPointerException();

        int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

        else if (!addWorker(command, false))

            reject(command);

    }

下面主要分析下addWorker这个方法:

 

 

//以2种场景为例进行分析:

//1第一次添加任务到线程池中  2 第6次添加  corePoolsize=5

private boolean addWorker(Runnable firstTask, boolean core) { // firstTask就是Runnable任务 core=true

        retry:

        for (;;) {  //相当于while循环

            int c = ctl.get(); //初始换后状态位Running  线程池中任务数为0

            int rs = runStateOf(c); //获取线程池的状态 这里是RUNNING

 

            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN &&

                ! (rs == SHUTDOWN &&

                   firstTask == null &&

                   ! workQueue.isEmpty()))

                return false;

 

            for (;;) { //相当于while循环

                int wc = workerCountOf(c); //获取worker数量 wc=0

//判断线程池中线程是否超出了限制,若超出了则返回false

                if (wc >= CAPACITY || 

                    wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;

//CAS 将c的值+1  操作失败退出循环

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                c = ctl.get();  // Re-read ctl

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop

            }

        }

 

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

//将传进来的Runnable 任务构建成Worker对象

            w = new Worker(firstTask);

//获取worker对应的线程

            final Thread t = w.thread;

            if (t != null) {

//获取锁

                final ReentrantLock mainLock = this.mainLock;

                mainLock.lock();

                try {

                    // Recheck while holding lock.

                    // Back out on ThreadFactory failure or if

                    // shut down before lock acquired.

                    int rs = runStateOf(ctl.get());//获取线程池状态 这里是RUNNING

//判断线程池状态

                    if (rs < SHUTDOWN ||

                        (rs == SHUTDOWN && firstTask == null)) {

                        if (t.isAlive()) // precheck that t is startable

                            throw new IllegalThreadStateException();

//将工作任务添加到workers集合中

                        workers.add(w);

                        int s = workers.size();

                        if (s > largestPoolSize)  //初始化时largestPoolSize=0

                            largestPoolSize = s; //赋值  largestPoolSize=1

                        workerAdded = true;  //这里true表明添加成功的标识,后面执行该线程

                    }

                } finally {

                    mainLock.unlock();

                }

                if (workerAdded) {

                    t.start();  //线程开始执行

                    workerStarted = true;

                }

            }

        } finally {

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

}

 

下面看看构建 Worker对象的逻辑:

Worker(Runnable firstTask) {

            setState(-1); // 设置AQS state=-1

            this.firstTask = firstTask; //任务赋值给firstTask全局变量

            this.thread = getThreadFactory().newThread(this); //从线程工程创建新的线程

        }

 

 

 

 

当第6次添加的时候

// command Runnable 任务

public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        int c = ctl.get();  //获取线程池的状态 这里是Running

        if (workerCountOf(c) < corePoolSize) {  //这里c的任务数是5 corePoolSize=5

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

// 判断线程池的状态是不是Ruuning  将任务添加到队列中

        if (isRunning(c) && workQueue.offer(command)) {//进入这个逻辑

            int recheck = ctl.get(); //再次获取ctl对象

            if (! isRunning(recheck) && remove(command)) //线程池状态不是Running 或者任务被移除则局拒绝任务

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false); //添加空任务到worker中

        }

        else if (!addWorker(command, false))

            reject(command);

    }

到这里线程池的分析已经结束了;

转载于:https://www.cnblogs.com/beppezhang/p/11214702.html

相关资源:JAVA上百实例源码以及开源项目
最新回复(0)