Java线程池原理及源码解析

从源码的角度简单总结ThreadPoolExecutor线程池的实现原理。

Executor

java.util.concurrent包里负责处理线程池的接口。

Executor是一个执行Runnable任务的接口,只有一个execute方法。这个接口将任务的提交工作从任务执行中解耦出来。Executor通常用来代替显示地创建线程。

1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecutorService

ExecutorService接口继承自Executor,提供了管理线程生命周期的方法,可以返回一个Future对象来跟踪异步任务的执行过程。比如shutdown方法可以让它拒绝新的任务;submit方法是基于Executor的executor方法的,接收Runnable或Callable任务,可以创建并返回一个Future对象,通过Future对象可以强制取消或让任务挂起。

Executors

主要提供一系列工厂方法创建线程池,返回的线程池都实现了ExecutorService接口。主要有三种

  • CachedThreadPool:创建一个可缓存的线程池,一个任务创建一个线程;
  • FixedThreadPool:创建固定数目线程的线程池
  • SingleThreadExecutor:相当于大小为 1 的 FixedThreadPool。

这三种线程池都是ThreadPoolExecutor的实例。

ThreadPoolExecutor

ThreadPoolExecutor这个类是我们这篇文章的核心,上一节提到的三种线程池实际上都是特殊的ThreadPoolExecutor。Executors类中三个创建线程池的静态方法都返回了ThreadPoolExecutor实例,区别仅在于参数的不同。所以我们应该把注意力放到ThreadPoolExecutor上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//Executors.java
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

ThreadPoolThread对象的构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

每个参数的含义都有官方解释:

  • corePoolsize:线程池有多少条线程,即使他们是闲置的
  • maximumPoolSize:线程池所允许存放线程的最大数量
  • keepAliveTime:当线程池线程的数量超过了corePoolSize,那些额外闲置的线程所能存活的时间
  • unit:keepAliveTime时间的单元粒度
  • workQueue:阻塞队列,存放即将被执行的任务
  • threadFactory:executor创建线程时使用的线程工厂
  • handler:线程池拒绝策略处理器。当线程池线程已满或阻塞队列已满时拒绝新的任务请求。

我们使用线程池一般通过如下方法:

1
2
ExecutorService executorService = Executors.newFixedThreadPool(2);
exectorService.execute(... ...);

第一行实际上就是返回了一个ThreadPoolExecutor的实例,然后调用execute方法,所以关键在于这个方法内部 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

首先我们看一下这个ctl是什么,它是一个原子操作的整型,用来描述线程池的控制状态,包装了两个概念域

  • workCount:有效线程的数量,指已经开始运行且不被允许停止的线程的数量,可能不等于真实存活的线程的数量。
  • runState:线程在运行还是关闭等状态
1
2
3
4
5
6
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

所以第一步是根据ctl的状态判断正在运行的线程数是否小于核心线程数,如果满足则尝试创建一个新核心线程,并把当前任务分配给它执行。

第二步尝试把任务加入阻塞队列,如果这时线程池没有运行,就移除任务并执行拒绝策略;如果线程池还在运行,就说明没有可用的线程了,就启动一个线程。

第三步如果无法把任务加入阻塞队列,就尝试创建一个新的非核心线程,如果失败,就直接执行拒绝策略。

addWorker

根据上面的介绍,我们知道addWorker方法是是用来创建线程并执行任务的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//检查线程池的状态,如果关闭就直接返回
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//自旋操作修改线程数量
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加到线程池
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

在try代码块内执行创建线程的操作。ThreadPoolExecutor把执行任务的线程封装为一个Worker对象,我们看Worker类的定义可以发现,它实现了Runnable接口并继承了AQS!

1
2
3
4
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{... ...}

所以先创建线程,然后添加到线程池中,最后再启动线程。那么线程池workers是什么呢?

1
2
3
4
5
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();

可以看到workers是一个HashSet,也就是说线程池底层的存储结构就是一个HashSet,存储工作线程Worker!

runWorker

工作线程启动后或调用Worker的run方法,run实际上是交给了runWorker方法进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

getTask()是从队列中获取任务。

可以看到任务执行之前和任务执行之后有两个方法,beforeExecute和afterExecute,他们本身是protected类型的空方法,是允许我们继承线程池,自定义任务执行前后的操作。

reject

当队列已满且线程池中线程的数量也达到了最大值,再有新任务进来就会触发线程池的拒绝策略。

ThreadPoolExecutor提供了几种拒绝策略的实现类。

  1. AbortPolicy
1
2
3
4
5
6
7
8
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

默认的拒绝策略。直接抛出RejectedExecutionException异常。

  1. DiscardPolicy
1
2
3
4
5
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

直接抛弃这个任务什么都不做。

  1. DiscardOldestPolicy
1
2
3
4
5
6
7
8
9
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

根据名称,抛弃最老策略,就是把队首元素抛弃。

  1. CallerRunsPolicy
1
2
3
4
5
6
7
8
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

调用者主线程直接执行。

总结

  • ThreadPoolExecutor默认使用核心线程处理任务
  • 工作线程数等于核心线程数时,新来的任务加入阻塞队列
  • 阻塞队列任务已满,开始创建非核心线程处理任务
  • 线程池线程的数量达到maximumPoolSize时,执行拒绝策略
  • ThreadPoolExecutor底层数据结构是HashSet
0%