一、线程池初探
所谓线程池,就是将多个线程放在一个池子里面(所谓池化技术),然后需要线程的时候不是创建一个线程,而是从线程池里面获取一个可用的线程,然后执行我们的任务。线程池的关键在于它为我们管理了多个线程,我们不需要关心如何创建线程,我们只需要关系我们的核心业务,然后需要线程来执行任务的时候从线程池中获取线程。任务执行完之后线程不会被销毁,而是会被重新放到池子里面,等待机会去执行任务。
我们为什么需要线程池呢?首先一点是线程池为我们提高了一种简易的多线程编程方案,我们不需要投入太多的精力去管理多个线程,线程池会自动帮我们管理好,它知道什么时候该做什么事情,我们只要在需要的时候去获取就可以了。其次,我们使用线程池很大程度上归咎于创建和销毁线程的代价是非常昂贵的,甚至我们创建和销毁线程的资源要比我们实际执行的任务所花费的时间还要长,这显然是不科学也是不合理的,而且如果没有一个合理的管理者,可能会出现创建了过多的线程的情况,也就是在JVM中存活的线程过多,而存活着的线程也是需要销毁资源的,另外一点,过多的线程可能会造成线程过度切换的尴尬境地。
对线程池有了一个初步的认识之后,我们来看看如何使用线程池。
创建一个线程池
1ExecutorService executorService = Executors.newFixedThreadPool(1);
提交任务
1
2
executorService.submit(() -> System.out.println(
"run"
));
Future<String> stringFuture = executorService.submit(() ->
"run"
);
创建一个调度线程池
1ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
提交一个周期性执行的任务
1
2
scheduledExecutorService
.scheduleAtFixedRate(() -> System.out.println(
"schedule"
),
0
,
1
, TimeUnit.SECONDS);
shutdown
1
2
executorService.shutdownNow();
scheduledExecutorService.shutdownNow();
可以发现使用线程池非常简单,只需要极少的代码就可以创建出我们需要的线程池,然后将我们的任务提交到线程池中去。我们只需要在结束之时记得关闭线程池就可以了。本文的重点并非在于如何使用线程池,而是试图剖析线程池的实现,比如一个调度线程池是怎么实现的?是靠什么实现的?为什么能这样实现等等问题。
二、Java线程池实现架构
Java中与线程池相关的类有下面一些:
ExecutorExecutorServiceScheduledExecutorServiceThreadPoolExecutorScheduledThreadPoolExecutorExecutors
通过上面一节中的使用示例,可以发现Executors类是一个创建线程池的有用的类,事实上,Executors类的角色也就是创建线程池,它是一个工厂类,可以产生不同类型的线程池,而Executor是线程池的鼻祖类,它有两个子类是ExecutorService和ScheduledExecutorService,而ThreadPoolExecutor和ScheduledThreadPoolExecutor则是真正的线程池,我们的任务将被这两个类交由其所管理者的线程池运行,可以发现,ScheduledThreadPoolExecutor是一个集大成者类
ScheduledThreadPoolExecutor的类关系图
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,ThreadPoolExecutor实现了一般的线程池,没有调度功能,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor的实现,然后增加了调度功能。
最为原始的Executor只有一个方法execute,它接受一个Runnable类型的参数,意思是使用线程池来执行这个Runnable,可以发现Executor不提供有返回值的任务。ExecutorService继承了Executor,并且极大的增强了Executor的功能,不仅支持有返回值的任务执行,而且还有很多十分有用的方法来为你提供服务,下面展示了ExecutorService提供的方法:
ExecutorService提供的方法
ScheduledExecutorService继承了ExecutorService,并且增加了特有的调度(schedule)功能。关于Executor、ExecutorService和ScheduledExecutorService的关系,可以见下图:
Executor、ExecutorService和ScheduledExecutorService的关系
总结一下,经过我们的调研,可以发现其实对于我们编写多线程代码来说,最为核心的是Executors类,根据我们是需要ExecutorService类型的线程池还是ScheduledExecutorService类型的线程池调用相应的工厂方法就可以了,而ExecutorService的实现表现在ThreadPoolExecutor上,ScheduledExecutorService的实现则表现在ScheduledThreadPoolExecutor上,下文将分别剖析这两者,尝试弄清楚线程池的原理。
三、ThreadPoolExecutor解析
上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。
ThreadPoolExecutor的类图
下面是几个比较关键的类成员:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private
final
BlockingQueue<Runnable> workQueue;
// 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行
private
final
HashSet<Worker> workers =
new
HashSet<Worker>();
//任务的执行值集合,来消费workQueue里面的任务
private
volatile
ThreadFactory threadFactory;
//线程工厂
private
volatile
RejectedExecutionHandler handler;
//拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下:
1
、CallerRunsPolicy:在调用者线程里面运行该任务
2
、DiscardPolicy:丢弃任务
3
、DiscardOldestPolicy:丢弃workQueue的头部任务
private
volatile
int
corePoolSize;
//最下保活work数量
private
volatile
int
maximumPoolSize;
//work上限
我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
step
1
: <ExecutorService>
Future<?> submit(Runnable task);
step
2
:<AbstractExecutorService>
public
Future<?> submit(Runnable task) {
if
(task ==
null
)
throw
new
NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task,
null
);
execute(ftask);
return
ftask;
}
step
3
:<Executor>
void
execute(Runnable command);
step
4
:<ThreadPoolExecutor>
public
void
execute(Runnable command) {
if
(command ==
null
)
throw
new
NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int
c = ctl.get();
if
(workerCountOf(c) < corePoolSize) {
if
(addWorker(command,
true
))
return
;
c = ctl.get();
}
if
(isRunning(c) && workQueue.offer(command)) {
//提交我们的额任务到workQueue
int
recheck = ctl.get();
if
(! isRunning(recheck) && remove(command))
reject(command);
else
if
(workerCountOf(recheck) ==
0
)
addWorker(
null
,
false
);
}
else
if
(!addWorker(command,
false
))
//使用maximumPoolSize作为边界
reject(command);
//还不行?拒绝提交的任务
}
step
5
:<ThreadPoolExecutor>
private
boolean
addWorker(Runnable firstTask,
boolean
core)
step
6
:<ThreadPoolExecutor>
w =
new
Worker(firstTask);
//包装任务
final
Thread t = w.thread;
//获取线程(包含任务)
workers.add(w);
// 任务被放到works中
t.start();
//执行任务
上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。下面分析一下Worker类。
上面的图片展示了Worker的类关系图,关键在于他实现了Runnable接口,所以问题的关键就在于run方法上。在这之前,我们来看一下Worker类里面的关键成员:
1
2
3
final
Thread thread;
Runnable firstTask;
//我们提交的任务,可能被立刻执行,也可能被放到队列里面
thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public
void
run() {
runWorker(
this
);
}
final
void
runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask =
null
;
w.unlock();
// allow interrupts
boolean
completedAbruptly =
true
;
try
{
while
(task !=
null
|| (task = getTask()) !=
null
) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if
((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
{
beforeExecute(wt, task);
Throwable thrown =
null
;
try
{
task.run();
}
catch
(RuntimeException x) {
thrown = x;
throw
x;
}
catch
(Error x) {
thrown = x;
throw
x;
}
catch
(Throwable x) {
thrown = x;
throw
new
Error(x);
}
finally
{
afterExecute(task, thrown);
}
}
finally
{
task =
null
;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly =
false
;
}
finally
{
processWorkerExit(w, completedAbruptly);
}
}
我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private
Runnable getTask() {
boolean
timedOut =
false
;
// Did the last poll() time out?
for
(;;) {
int
c = ctl.get();
int
rs = runStateOf(c);
// Check if queue empty only if necessary.
if
(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return
null
;
}
int
wc = workerCountOf(c);
// Are workers subject to culling?
boolean
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if
((wc > maximumPoolSize || (timed && timedOut))
&& (wc >
1
|| workQueue.isEmpty())) {
if
(compareAndDecrementWorkerCount(c))
return
null
;
continue
;
}
try
{
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if
(r !=
null
)
return
r;
timedOut =
true
;
}
catch
(InterruptedException retry) {
timedOut =
false
;
}
}
}
其实核心也就一句:
1
2
3
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
我们再回头看一下execute,其实我们上面只走了一条逻辑,在execute的时候,我们的worker的数量还没有到达我们设定的corePoolSize的时候,会走上面我们分析的逻辑,而如果达到了我们设定的阈值之后,execute中会尝试去提交任务,如果提交成功了就结束,否则会拒绝任务的提交。我们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,但是我们上面的分析是corePoolSize,这是因为我们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,否则使用maximumPoolSize来设定边界。直观的解释一下,当线程池里面的Worker数量还没有到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,如果Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,如果没有办法再向阻塞队列放任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,如果线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。可以参考下面的描述来理解:
1
2
3
4
5
6
7
8
9
10
11
12
13
* When a
new
task is submitted in method {
@link
#execute(Runnable)},
* and fewer than corePoolSize threads are running, a
new
thread is
* created to handle the request, even
if
other worker threads are
* idle. If there are more than corePoolSize but less than
* maximumPoolSize threads running, a
new
thread will be created only
*
if
the queue is full. By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {
@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {
@link
#setCorePoolSize} and {
@link
* #setMaximumPoolSize}.
在此需要说明一点,有一个重要的成员:keepAliveTime,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime之后被terminated。可以参考下面的文档:
1
2
3
* If the pool currently has more than corePoolSize threads,
* excess threads will be terminated
if
they have been idle
for
more
* than the keepAliveTime (see {
@link
#getKeepAliveTime(TimeUnit)}).
四、ScheduledThreadPoolExecutor解析
ScheduledThreadPoolExecutor适用于延时执行,或者周期性执行的任务调度,ScheduledThreadPoolExecutor在实现上继承了ThreadPoolExecutor,所以你依然可以将ScheduledThreadPoolExecutor当成ThreadPoolExecutor来使用,但是ScheduledThreadPoolExecutor的功能要强大得多,因为ScheduledThreadPoolExecutor可以根据设定的参数来周期性调度运行,下面的图片展示了四个和周期性相关的方法:
四个Scheduled方法
如果你想延时一段时间之后运行一个Runnable,那么使用第一个方法如果你想延时一段时间然后运行一个Callable,那么使用的第二个方法如果你想要延时一段时间,然后根据设定的参数周期执行Runnable,那么可以选择第三个和第四个方法,第三个方法和第四个方法的区别在于:第三个方法严格按照规划的时间路径来执行,比如周期为2,延时为0,那么执行的序列为0,2,4,6,8....,而第四个方法将基于上次执行时间来规划下次的执行,也就是在上次执行完成之后再次执行。比如上面的执行序列0,2,4,6,8...,如果第2秒没有被调度执行,而在第三秒的时候才被调度,那么下次执行的时间不是4,而是5,以此类推。
下面来看一下这四个方法的一些细节:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public
<V> ScheduledFuture<V> schedule(Callable<V> callable,
long
delay,
TimeUnit unit) {
if
(callable ==
null
|| unit ==
null
)
throw
new
NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new
ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return
t;
}
public
ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long
initialDelay,
long
period,
TimeUnit unit) {
if
(command ==
null
|| unit ==
null
)
throw
new
NullPointerException();
if
(period <=
0
)
throw
new
IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new
ScheduledFutureTask<Void>(command,
null
,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return
t;
}
public
ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long
initialDelay,
long
period,
TimeUnit unit) {
if
(command ==
null
|| unit ==
null
)
throw
new
NullPointerException();
if
(period <=
0
)
throw
new
IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new
ScheduledFutureTask<Void>(command,
null
,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return
t;
}
public
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long
initialDelay,
long
delay,
TimeUnit unit) {
if
(command ==
null
|| unit ==
null
)
throw
new
NullPointerException();
if
(delay <=
0
)
throw
new
IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new
ScheduledFutureTask<Void>(command,
null
,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return
t;
}
通过上面的代码我们可以发现,前两个方法是类似的,后两个方法也是类似的。前两个方法属于一次性调度,所以period都为0,区别在于参数不同,一个是Runnable,而一个是Callable,可笑的是,最后都变为了Callable了,见下面的构造函数:
1
2
3
4
public
FutureTask(Runnable runnable, V result) {
this
.callable = Executors.callable(runnable, result);
this
.state = NEW;
// ensure visibility of callable
}
对于后两个方法,区别仅仅在于period的,scheduleWithFixedDelay对参数进行了操作,将原来的时间变为负数了,而后面在计算下次被调度的时间的时候会根据这个参数的正负值来分别处理,正数代表scheduleAtFixedRate,而负数代表了scheduleWithFixedDelay。
一个需要被我们注意的细节是,以上四个方法最后都会调用一个方法: delayedExecute(t),下面看一下这个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
private
void
delayedExecute(RunnableScheduledFuture<?> task) {
if
(isShutdown())
reject(task);
else
{
super
.getQueue().add(task);
if
(isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(
false
);
else
ensurePrestart();
}
}
大概的意思就是先判断线程池是否被关闭了,如果被关闭了,则拒绝任务的提交,否则将任务加入到任务队列中去等待被调度执行。最后的ensurePrestart的意思是需要确保线程池已经被启动起来了。下面是这个方法:
1
2
3
4
5
6
7
void
ensurePrestart() {
int
wc = workerCountOf(ctl.get());
if
(wc < corePoolSize)
addWorker(
null
,
true
);
else
if
(wc ==
0
)
addWorker(
null
,
false
);
}
主要是增加了一个没有任务的worker,有什么用呢?我们还记得Worker的逻辑吗?addWorker方法的执行,会触发Worker的run方法的执行,然后runWorker方法就会被执行,而runWorker方法是循环从workQueue中取任务执行的,所以确保线程池被启动起来是重要的,而只需要简单的执行addWorker便会触发线程池的启动流程。对于调度线程池来说,只要执行了addWorker方法,那么线程池就会一直在后台周期性的调度执行任务。
到此,似乎我们还是没有闹明白ScheduledThreadPoolExecutor是如何实现周期性的,上面讲到四个scheduled方法时,我们没有提一个重要的类:ScheduledFutureTask,对,所有神奇的事情将会发生在这个类中,下面来分析一下这个类。
ScheduledFutureTask类图
看上面的类图,貌似这个类非常复杂,还好,我们发现他实现了Runnable接口,那么必然会有一个run方法,而这个run方法必然是整个类的核心,下面来看一下这个run方法的内容:
1
2
3
4
5
6
7
8
9
10
11
12
public
void
run() {
boolean
periodic = isPeriodic();
if
(!canRunInCurrentRunState(periodic))
cancel(
false
);
else
if
(!periodic)
ScheduledFutureTask.
super
.run();
else
if
(ScheduledFutureTask.
super
.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
首先,判断是否是周期性的任务,如果不是,则直接执行(一次性),否则执行,然后设置下次执行的时间,然后重新调度,等待下次执行。这里有一个方法需要注意,也就是setNextRunTime,上面我们提到scheduleAtFixedRate和scheduleWithFixedDelay在传递参数时不一样,后者将delay值变为了负数,所以下面的处理正好印证了前文所述。
1
2
3
4
5
6
7
private
void
setNextRunTime() {
long
p = period;
if
(p >
0
)
time += p;
else
time = triggerTime(-p);
}
下面来看一下reExecutePeriodic方法是如何做的,他的目标是将任务再次被调度执行,下面的代码展示了这个功能的实现:
1
2
3
4
5
6
7
8
9
void
reExecutePeriodic(RunnableScheduledFuture<?> task) {
if
(canRunInCurrentRunState(
true
)) {
super
.getQueue().add(task);
if
(!canRunInCurrentRunState(
true
) && remove(task))
task.cancel(
false
);
else
ensurePrestart();
}
}
可以看到,这个方法就是将我们的任务再次放到了workQueue里面,那这个参数是什么?在上面的run方法中我们调用了reExecutePeriodic方法,参数为outerTask,而这个变量是什么?看下面的代码:
1
2
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask =
this
;
这个变量指向了自己,而this的类型是什么?是ScheduledFutureTask,也就是可以被调度的task,这样就实现了循环执行任务了。
上面的分析已经到了循环执行,但是ScheduledThreadPoolExecutor的功能是周期性执行,所以我们接着分析ScheduledThreadPoolExecutor是如何根据我们的参数走走停停的。这个时候,是应该看一下ScheduledThreadPoolExecutor的构造函数了,我们来看一个最简单的构造函数:
1
2
3
4
public
ScheduledThreadPoolExecutor(
int
corePoolSize) {
super
(corePoolSize, Integer.MAX_VALUE,
0
, NANOSECONDS,
new
DelayedWorkQueue());
}
我们知道ScheduledThreadPoolExecutor的父类是ThreadPoolExecutor,所以这里的super其实是ThreadPoolExecutor的构造函数,我们发现其中有一个参数DelayedWorkQueue,看名字貌似是一个延迟队列的样子,进一步跟踪代码,发现了下面的一行代码(构造函数中):
1this.workQueue = workQueue;
所以在ScheduledThreadPoolExecutor中,workQueue是一个DelayedWorkQueue类型的队列,我们暂且认为DelayedWorkQueue是一种具备延迟功能的队列吧,那么,到此我们便可以想明白了,上面的分析我们明白了ScheduledThreadPoolExecutor是如何循环执行任务的,而这里我们明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue来达到延迟的目标,所以组合起来,就可以实现ScheduledThreadPoolExecutor周期性执行的目标。下面我们来看一下DelayedWorkQueue是如何做到延迟的吧,上文中提到一个方法:getTask,这个方法的作用是从workQueue中取出任务来执行,而在ScheduledThreadPoolExecutor里面,getTask方法是从DelayedWorkQueue中取任务的,而取任务无非两个方法:poll或者take,下面我们对DelayedWorkQueue的take方法来分析一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public
RunnableScheduledFuture<?> take()
throws
InterruptedException {
final
ReentrantLock lock =
this
.lock;
lock.lockInterruptibly();
try
{
for
(;;) {
RunnableScheduledFuture<?> first = queue[
0
];
if
(first ==
null
)
available.await();
else
{
long
delay = first.getDelay(NANOSECONDS);
if
(delay <=
0
)
return
finishPoll(first);
first =
null
;
// don't retain ref while waiting
if
(leader !=
null
)
available.await();
else
{
Thread thisThread = Thread.currentThread();
leader = thisThread;
try
{
available.awaitNanos(delay);
}
finally
{
if
(leader == thisThread)
leader =
null
;
}
}
}
}
}
finally
{
if
(leader ==
null
&& queue[
0
] !=
null
)
available.signal();
lock.unlock();
}
}
在for循环里面,首先从queue中获取第一个任务,然后从任务中取出延迟时间,而后使用available变量来实现延迟效果。这里面需要几个点需要探索一下:
这个queue是什么东西?延迟时间的来龙去脉?available变量的来龙去脉?
对于第一个问题,看下面的代码:
1
2
private
RunnableScheduledFuture<?>[] queue =
new
RunnableScheduledFuture<?>[INITIAL_CAPACITY];
它是一个RunnableScheduledFuture类型的数组
RunnableScheduledFuture类关系
数组里面保存了我们的RunnableScheduledFuture,对queue的操作,主要来看一下增加元素和消费元素的操作。首先,假设使用add方法来增加RunnableScheduledFuture到queue,调用的链路如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public
boolean
add(Runnable e) {
return
offer(e);
}
public
boolean
offer(Runnable x) {
if
(x ==
null
)
throw
new
NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final
ReentrantLock lock =
this
.lock;
lock.lock();
try
{
int
i = size;
if
(i >= queue.length)
grow();
size = i +
1
;
if
(i ==
0
) {
queue[
0
] = e;
setIndex(e,
0
);
}
else
{
siftUp(i, e);
}
if
(queue[
0
] == e) {
leader =
null
;
available.signal();
}
}
finally
{
lock.unlock();
}
return
true
;
}
解释一下,add方法直接转到了offer方法,该方法中,首先判断数组的容量是否足够,如果不够则grow,增长的策略如下:
1int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
每次增长50%,入戏下去。增长完成后,如果这是第一个元素,则放在坐标为0的位置,否则,使用siftUp操作,下面是该方法的内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
private
void
siftUp(
int
k, RunnableScheduledFuture<?> key) {
while
(k >
0
) {
int
parent = (k -
1
) >>>
1
;
RunnableScheduledFuture<?> e = queue[parent];
if
(key.compareTo(e) >=
0
)
break
;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
这个数组实现了堆这种数据结构,使用对象比较将最需要被调度执行的RunnableScheduledFuture放到数组的前面,而这得力于compareTo方法,下面是RunnableScheduledFuture类的compareTo方法的实现,主要是通过延迟时间来做比较。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public
int
compareTo(Delayed other) {
if
(other ==
this
)
// compare zero if same object
return
0
;
if
(other
instanceof
ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long
diff = time - x.time;
if
(diff <
0
)
return
-
1
;
else
if
(diff >
0
)
return
1
;
else
if
(sequenceNumber < x.sequenceNumber)
return
-
1
;
else
return
1
;
}
long
diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return
(diff <
0
) ? -
1
: (diff >
0
) ?
1
:
0
;
}
上面是生产元素,下面来看一下消费数据。在上面我们提到的take方法中,使用了一个方法如下:
1
2
3
4
5
6
7
8
9
private
RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int
s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] =
null
;
if
(s !=
0
)
siftDown(
0
, x);
setIndex(f, -
1
);
return
f;
}
这个方法中调用了一个方法siftDown,这个方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private
void
siftDown(
int
k, RunnableScheduledFuture<?> key) {
int
half = size >>>
1
;
while
(k < half) {
int
child = (k <<
1
) +
1
;
RunnableScheduledFuture<?> c = queue[child];
int
right = child +
1
;
if
(right < size && c.compareTo(queue[right]) >
0
)
c = queue[child = right];
if
(key.compareTo(c) <=
0
)
break
;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
对其的解释就是:
1Replaces first element with last and sifts it down. Call only when holding lock.
总结一下,当我们向queue插入任务的时候,会发生siftUp方法的执行,这个时候会把任务尽量往根部移动,而当我们完成任务调度之后,会发生siftDown方法的执行,与siftUp相反,siftDown方法会将任务尽量移动到queue的末尾。总之,大概的意思就是queue通过compareTo实现了类似于优先级队列的功能。
下面我们来看一下第二个问题:延迟时间的来龙去脉。在上面的take方法里面,首先获取了delay,然后再使用available来做延迟效果,那这个delay从哪里来的呢?通过上面的类图RunnableScheduledFuture的类图我们知道,RunnableScheduledFuture类实现了Delayed接口,而Delayed接口里面的唯一方法是getDelay,我们到RunnableScheduledFuture里面看一下这个方法的具体实现:
1
2
3
public
long
getDelay(TimeUnit unit) {
return
unit.convert(time - now(), NANOSECONDS);
}
time是我们设定的下次执行的时间,所以延迟就是(time - now()),没毛病!
第三个问题:available变量的来龙去脉,至于这个问题,我们看下面的代码:
1
2
3
4
5
/**
* Condition signalled when a newer task becomes available at the
* head of the queue or a new thread may need to become leader.
*/
private
final
Condition available = lock.newCondition();
这是一个条件变量,take方法里面使用这个变量来做延迟效果。Condition可以在多个线程间做同步协调工作,更为具体细致的关于Condition的内容
转载于:https://www.cnblogs.com/llaq/p/9515375.html
相关资源:java线程池demo
转载请注明原文地址: https://mac.8miu.com/read-15161.html