线程池-ThreadPoolExecutor
文档简介
ThreadPoolExecutor 是 ExecutorService的一个实现,其功能是:使用几个被池化的线程来执行提交的任务,这些在池中的线程由 ThreadFactory 来创建。
线程池解决了两个问题:
提高了执行大量异步任务的效率。
使用缓存的线程来,执行任务,节省的线程创建的时间。
提供了一种限制和管理资源的方法。
此外,ThreadPoolExecutor 还维护了一些基本统计信息,例如已经完成的任务(task)的个数。
为了使这个线程池可以在不同的场景下使用,这个类提供了许多可以配置的参数和一些回调函数。然而,比较推荐的是使用 Executors 类来创建 ThreadPoolExecutor 对象,这个类提供了一些默认的参数,来实现不同功能的线程池。如果 Executors 类提供的线程池,不能够满足需求,可以按照下面的 ThreadPoolExecutor 的参数配置说明,来配置和微调线程池。
Core and maximum pool size
ThreadPoolExecutor 将使用参数 corePoolSize 和maximumPoolSize 来自动地调整 pool size (在池中的线程数量)。当使用 execute 方法提交一个新的任务的时候: (1) 如果当前运行的线程数量少于 corePoolSize ,则ThreadPoolExecutor将创建一个新线程来运行这个任务,尽管此时可能有其它线程处于空闲状态(idle).(2) 如果当前运行的线程数量是介于 corePoolSize 和 maximumPoolSize 之间,当且仅当任务队列是满的时候(也就是这个任务无法入队列),才会创建一个新线程去运行这个任务。(3) 当前运行的线程数量已经是 maximumPoolSize,且队列也是不满,则入队列。(4) 当前运行的线程数量已经是 maximumPoolSize,且队列也是已经满了,则需要reject这个任务。
当将 corePoolSize 和 maximumPoolSize 设置相同的时候,此时就是一个 fixed-size(固定大小) 的线程池。
当将 maximumPoolSize 设置成 Integer.MAX_VALUE的时候,表示这个线程池可以接受任意数量的并发的任务。
通常情况,使用 ThreadPoolExecutor 的构造函数配置这两个参数。当然也可以使用 setCorePoolSize 和 setMaximumPoolSize 方法动态的改变这两个参数。
On-demand construction
默认情况下池中所有线程都是任务被提交之后按需要创建的。但是,我们可以使用 prestartCoreThread() 和 prestartAllCoreThreads().方法来预先启动 coreThread。因为有一种情况,可能需要这样做。就是,ThreadPoolExecutor在创建的时候需要指定任务的存储队列,如果这个队列中已经有一些任务存在了,就可以使用上面的方法,通过预先启动线程,使得 BlockingQueue 中的任务可以立即被执行,而不是一直等待到有任务被提交时,才有线程可以执行这些任务。
Creating new threads
使用 ThreadFactory 来创建新的线程。如果没有指定这个参数,则默认使用 Executors.DefaultThreadFactory 这个实现类。这个类将所有的线程放置到同一个线程组中,其线程的命名规则是:pool-1-thread-1,pool-1-thread-2,pool-1-thread-3,etc. 其中第一个数字是线程池实例的编号,第二个数字是当前线程池中线程的编号。同时将线程的优先级设置在 NORM_PRIORITY 和 non-daemon 状态。
通过,实现 ThreadFactory 接口,可以对线程池中线程的 name, thread group, priority, daemon status 进行设置。
如果这个实现类的 newThread 方法返回null, 则当前的 executor 将继续执行,但是有可能不会执行任何任务。
Keep-alive times
如果线程池中当前线程的个数超过了 corePoolSize,并且这些超出的线程处于空闲状态的时间已经超过了 keepAliveTime, 则这些线程将被终止。当池变得不够活跃的时候,通过回收这些线程,节省了线程的资源的占用。
这个参数也可以通过 setKeepAliveTime 方法来动态改变。
默认情况下,超时策略仅适用于,超过 corePoolSize 的那些线程。但是如果使用 allowCoreThreadTimeOut(true) 调用,则可以设置,让 core threads 也可以在超时的时候被回收。
Queuing
executor 使用 BlockingQueue 来存储提交的任务。通常有三种策略:(1) Direct handoffs(直接切换): SynchronousQueue. (2) Unbounded queues(无界队列): LinkedBlockingQueue. (3) Bounded queues(有界队列): ArrayBlockingQueue.
Rejected tasks
当 executor 被 shutdown 或者 maximum threads 和 queue capacity 都是有界的,而且,都已经满了的时候。executor 的 execute 方法 将 调用 RejectedExecutionHandler.rejectedExecution 方法,来 reject 一个任务。
预定义的 handler 有:(1) ThreadPoolExecutor.AbortPolicy: 直接抛出 RejectedExecutionException 异常。(2) ThreadPoolExecutor.CallerRunsPolicy: 被 reject 的任务将直接在 调用者 线程中被执行。它提供了一种简单的反馈控制机制,从而减缓了新任务提交的速率。其实就是新提交的任务被当前提交任务的线程执行,使用提交过程被减缓。(3) ThreadPoolExecutor.DiscardPolicy: 将这个不能提交的任务直接丢弃(不执行,不抛异常,do nothing)。(4) ThreadPoolExecutor.DiscardOldestPolicy: 将当前队列中等待最久的线程移除,然后再次提交这个任务。
It is possible to define and use other kinds of RejectedExecutionHandler classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies.
Hook methods
提供了两个方法 beforeExecute 和 afterExecute 在每一个任务被执行的时候,这两个方法都会被调用。这些方法实现的通常目地可以是:调整任务的执行环境,例如:重新初始化 ThreadLocal, 收集统计信息, 添加任务执行日志等等。
此外,可以 override terminated方法来执行当 executor 被终止的时候,执行一些特殊的资源清理操作等。
Queue maintenance
getQueue 允许其它线程出于高度或者监控目地。如果将这个方法使用于其它目地是强烈不推荐的。
如果需要调用 queue, 则应该使用 executor 提供的 remove 和 purge 方法,而不是直接操作 queue.
Finalization
A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically.
当池对象不再被任何线程引用并且池中没有剩余的线程,则这个线程将会自动被关闭。
状态
ctl: 线程池的生命周期控制字段
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
这个变量分为两个部分:
- workerCount: 后 29 个字节。表示当前线程池中有效的线程数。
- runState: 前 3 个字节。表示当前线程池的运行状态。
由初始化的参数可知,当线程池初始化之后的,默认状态是:RUNNING 表示当前线程池可以接受新的任务,并且可以处理queue中的任务。同时0表示线程池中当前的线程个数为0.
关于 ctl 的操作:1
2
3
4
5
6
7 // Packing and unpacking ctl
// 从 ctl 中取出 runState。
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 从 ctl 中取出 workerCount。
private static int workerCountOf(int c) { return c & CAPACITY; }
// 将 runState 和 wc 打包成 ctl 变量。
private static int ctlOf(int rs, int wc) { return rs | wc; }
workQueue: 任务队列
1 | private final BlockingQueue<Runnable> workQueue; |
这个队列中用来持有(存储)任务和分发任务给 工作者线程(worker threads)。这个队列中使用过程中,并不会以 workQueue.poll 方法返回 null ,而认为队列是空的。而是使用 workQueue.isEmpty 方法来判断当前队列是否为空。
workers: 工作者线程
1 | private final HashSet<Worker> workers = new HashSet<Worker>(); |
存储池中的所有线程。Set containing all worker threads in pool.
Worker 内部类
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable |
表示executor中内部线程池中的线程。
提交任务的过程
1 | // 提交任务 |
addWorker
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
runWorker
上面的 addWorker 如果添加 worker 成功,则启动 worker 线程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75// worker 的 run 方法
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 工作者线程的主循环,不断的运行任务,
// 任务的来源有:
// (1) 如果这个线程是第一次运行,则一般会有一个任务和其关联,就是 w.firstTask,
// (2) 当 firstTask 运行完毕之后,就从 BlockingQueue 中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 获取 task 之后,开始执行 task.
try {
// 每一个任务在开始执行前的回调接口
// 子类可以 override 这个方法,来调整 task 的执行环境,进行统计,日志分析
// 调试等工作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 运行任务.
task.run();
// 如果任务运行过程中出现异常,
// 将导致,worker线程运行 throw 异常,而退出.
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 每一个任务在执行结束之后的回调接口
// 子类可以 override 这个方法,来调整 task 的执行环境,进行统计,日志分析
// 调试等工作,通过第二个参数 thrown == null,来判断任务是否执行成功,或是
// 执行过程中抛出异常了.
afterExecute(task, thrown);
}
} finally {
// 一个任务执行完毕之后
// 设置 task 为 null,
task = null;
// 工作者线程的执行 task 的个数统计.
w.completedTasks++;
w.unlock();
}
}
// 如果线程没有在执行 task 的过程中抛出异常
// 也就是说线程是正常从上面的 main loop 退出的,
// 则 设置 completedAbruptly 为 false.
completedAbruptly = false;
} finally {
// worker 退出了, 调用processWorkerExit调整线程池的状态.
processWorkerExit(w, completedAbruptly);
}
}
在线程退出之后会调用 processWorkerExit 方法,这个方法的第二个字段 completedAbruptly, 表示
线路退出的原因,如果是正常退出为 false, 如果线程异常退出为 ture, 在 processWorkerExit 方法
中将会将 workerCount - 1.
线程的生命周期
RUNNING
可以接受新的任务,同时可以处理队列中的任务.
SHUTDOWN
不接受新的任务,但是仍然会处理队列中的任务.
STOP
不接受新的任务,不处理队列中的任务了,将正在处理任务的线程中断.
TIDYING
所有的任务被 terminated, 并且 workerCount == 0,线程状态就会变成这个状态, terminated 回调函数被调用.
TERMINATED
terminated 回调函数被调用完成之后, 线程池就处于这个状态.
生命周期的转换:
RUNNING -> SHUTDOWN
调用 shutdown 函数的时候, 同时 finalize 函数内部也调用了该函数.
(RUNNING or SHUTDOWN) -> STOP
调用 shutdownNow 函数的时候
SHUTDOWN -> TIDYING
当队列和线程池都为空时.
STOP -> TIDYING
当线程池为空时.
TIDYING -> TERMINATED
When the terminated() hook method has completed
线程池的终止
1 | final void tryTerminate() { |
getTask
在 runWorker 的 main leep 中获取任务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72// 这个函数有三个出口, 查看 runWorker 函数中使用 getTask ,
// 当 getTask 这个函数返回 null 时,worker 主线程会退出主循环
// worker 线程就会退出。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 响应线程池的生命周期
// Check if queue empty only if necessary.
// 下面的判断条件相当于:
// if( (rs >= SHUTDOWN && workQueue.isEmpty()) || rs >= STOP )
// 就是 线程池状态处于 STOP 状态,
// 或者 线程池状态处于 SHUTDOWN 状态 并且 workQueue 中 没有任务了。
// 在这两种状态下 worker 线程将会退出。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 当前线程要退出了,所以减少 workerCount 的计数。
decrementWorkerCount();
return null;
}
// 是否允许当前线程在当前线程池的状态下可以超时。
boolean timed; // Are workers subject to culling?
for (;;) {
// 线程是否超时判断。从这里可以看出,其实一个线程是否是
// core 线程,还是非 core 线程,是没有固定的,也就是
// 在线程池中的所有的worker线程,是依据当前线程池的状态
// 来判断是超时,如果不可以超时,则认为它是 core 线程
// 到第二次获取任务的时候,有可能变成 可以超时的,所以
// 它变成了 非core 线程了。
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
// 1. 当前线程数量大于 maximumPoolSize
// 2. 如果当前线程上次超时了,并且这次也允许超时
// 这两种情况下,当前线程就应该从线程池中退出了。
// 当前线程要退出了,所以减少 workerCount 的计数。
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 如果允许线程超时,使用 poll 来获取任务,否则使用 take 来阻塞获取任务。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 这个设计非常好,这里并不会直接返回r,因为有可能,workQueue有可能超时
// 返回 null, 所以如果直接返回 r 的时候,会使得当前线程直接退出,这会有问题,
// 因为在上面的 poll 过程的 block 过程中有可能,线程池中的其它线程有可能已经
// 退出,如果直接使得当前线程返回 null, 就有可能,导致线程池中的线程数量 小于
// corePoolSize , 显然这不符合线程池对 corePoolSize 的语义。
// 所以,这里使用 timedOut 字段标识,线程上次超了,然后,继续循环,
// 在上面的代码中,会使用timedOut 状态判。然后作对应的处理。
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
Executors 提供的几种线程池
newFixedThreadPool
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
这类线程池将创建线程数量最多为 nThreads 个线程,由于个这个原因, getTask 方法从队列中获取任务的方法将总是:
workQueue.take();
所以,对于超时的配置,其实是用不到的,所以这里 keepAliveTime 参数的值是 0, 其实根本用不到。
这 nThreads 个线程将一直运行来执行任务。
线程创建执行机制:
- 前 nThreads 个任务的提交,将使当前线程中创建好 nThreads 个线程
- 后续提交的任务,将依时间顺序入队列,
- 然后由这 nThreads 个线程不继的循环消费。
线程超时
这个 nThreads 线程永远不会超时。
newSingleThreadExecutor
1 | public static ExecutorService newSingleThreadExecutor() { |
这个线程池内只有一个线程,当向这个线程池提交第一个任务时,创建这个线程。
提交第二个任务的时候,会直接将该任务,添加到 workerQueue 队列中。线程中的惟一那个
线路,开始从 workerQueue 中按添加顺序不断的执行任务。
同样,由于 corePoolSize == maximumPoolSize == 1, getTask 获取任务的方法是:
workQueue.take();
所以其超时参数没有使用到。
线程创建执行机制:
- 第一个任务的提交会创建这个线程池中的惟一个线程,然后任务被执行,
- 后续提交的任务,将依时间顺序直接入队列
- 线程池中的惟一线程将从队列中不断循环地取出任务,然后执行
线程超时
这一个线程永远不会超时。
newCachedThreadPool
1 | public static ExecutorService newCachedThreadPool() { |
由于 corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE, 所以,
这个线程中的行为是,添加一个任务就会,创建一个对应的线程,当这个线程就执行
完毕,就等待 60 秒,如果这 60 秒内有其它任务被提交,则这个提交的任务将被这个线程
执行,否则,线程在 60 秒之后被回收。直接新的 任务被提交,然后重复上面的过程。
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
由传入的参数,可知,在这个线程池中提交的任务执行完毕之后将等待 60 秒,然后退出。
线程创建执行机制:
- 如果第二个任务提交的时候,前一个任务还在执行,则这个任务会入队列失败,然后创建一个新的线程来执行
- 如果第二个任务提交的时候,前一个任务已经处于 poll 状态了,则这个任务将交由执行前一个任务的那个线程来执行,而不是创建新的线程
线程超时
这个线路池中的每一个线程都是 60s 的超时时间,过了 60s 这个线程就会被回收。
关于 workerCount 状态的维护
addWorker
在每次调用 addWorker 的时候,如果一个worker(线程)可以被创建,则其在创建之后必然会调用 compareAndIncrementWorkerCount 方法来,增加 workCount ,表示线程池中增加了一个新的线程。
getTask
线程通过轮询的方式来不断的执行任务。每次获取任务之前会判断是否可以线程执行。如果不行,则调用 compareAndDecrementWorkerCount, 表示当前线程将从线程池中被移除。
但是,存在一个问题,在 runWorker 的 main loop 中,用户代码有可能抛出异常,例如:run, beforeExecute, afterExecute 都有可能抛出异常。一旦出现异常,当前线程就会被 JVM 终止。而此时由于没有来得及调用循环中的 getTask 方法,所以,这个线程虽然已经异常终止了,但是在 workerCount 的计数中还占一位,所以必须,在处于 worker 退出的地方,进行判断,如果是异常终止,则调用 decrementWorkerCount 来清除这个异常终止的线程所占用的workerCount位。这也就是下面代码片断的原因:1
2
3
4// 当线程是异常终止的,completedAbruptly 就是 true.
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();