ThreadPoolExecutor - 969251639/study GitHub Wiki
jdk内置线程池ThreadPoolExecutor,功能很强大,是并发开发不得不研究的一个类
它有四种构造方法,最终都会执行到下面的这个构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
共有7个参数
corePoolSize: 线程池启动后,在池中保持的线程的最小数量。需要说明的是线程数量是逐步到达corePoolSize值的。例如corePoolSize被设置为10,而任务数量只有5,则线程池中最多会启动5个线程,而不是一次性地启动10个线程。
maximumPoolSize:线程池中能容纳的最大线程数量,如果超出,则使用RejectedExecutionHandler拒绝策略处理。
keepAliveTime:线程的最大生命周期。这里的生命周期有两个约束条件:一:该参数针对的是超过corePoolSize数量的线程;二:处于非运行状态的线程。举个例子:如果corePoolSize(最小线程数)为10,maxinumPoolSize(最大线程数)为20,而此时线程池中有15个线程在运行,过了一段时间后,其中有3个线程处于等待状态的时间超过keepAliveTime指定的时间,则结束这3个线程,此时线程池中则还有12个线程正在运行。
unit:这是keepAliveTime的时间单位,可以是纳秒,毫秒,秒,分钟等。
workerQueue:任务队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。这个任务队列是一个阻塞式的单端队列。
newFixedThreadPool和newSingleThreadExector使用的是LinkedBlockingQueue的无界模式
newCachedThreadPool使用的是SynchronousQueue
threadFactory:定义如何启动一个线程,可以设置线程的名称,并且可以确定是否是后台线程等。
handler:拒绝任务处理器。由于超出线程数量和队列容量而对继续增加的任务进行处理的程序。
注:corePoolSize,maximumPoolSize,workerQueue这三个参数紧密相连
(1)如果没有空闲的线程执行该任务且当前运行的线程数少于corePoolSize,则添加新的线程执行该任务。
(2)如果没有空闲的线程执行该任务且当前的线程数等于corePoolSize同时阻塞队列未满,则将任务入队列,而不添加新的线程。
(3)如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数小于maximumPoolSize,则创建新的线程执行任务。
(4)如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数等于maximumPoolSize,则根据构造函数中的handler指定的策略来拒绝新的任务。
注意,线程池并没有标记哪个线程是核心线程,哪个是非核心线程,线程池只关心核心线程的数量。
接下来看它内部最重要的方法submit的实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
这三个方法是在ThreadPoolExecutor的父类AbstractExecutorService中实现
首先都是调用newTaskFor生成Future对象
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
然后调用子类的execute方法,其中RunnableFuture继承了Runnable和Future接口,所以它可以用来执行获取线程执行结果或者直接运行线程
往下看最重要的execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
根据上面的注释,execute分三个步骤走
- 如果线程池里面的线程数小于核心线程数corePoolSize,那么创建一个新的线程,并将要执行的task交给它去执行,同时也调用addWorker方法,检查线程的运行状态和线程池数量,如果添加worker成功则返回,否则继续往下执行
- 如果一个任务能入队成功,在添加线程时仍需要进行双重检查(因为前一次检查后该线程死亡了),或者进入到此方法时线程池已经shutdown了,若有必要,当线程执行完时需要回滚入队或者线程池的核心线程数未满,需要创建一个空任务的线程
- 如果无法入队,那么需要增加一个新的线程,如果创建失败,意味着线程池要么已经满了要么已经shutdown了,执行拒绝策略
在分析上面的execute之前需要先了解下线程池成员变量的含义
//线程池的控制状态,其中高3位用来表示运行状态,低29位用来表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//低29位的偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;//29
//最大的容量(2的29次方-1),1左移29位 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 运行状态共有5个,至少需要3位来表示(2的三次方 = 8)
//运行中 接受新任务并且处理阻塞队列里的任务
private static final int RUNNING = -1 << COUNT_BITS;
//已关闭 拒绝新任务但是处理阻塞队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//已停止 拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS;
//整理中 所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
private static final int TIDYING = 2 << COUNT_BITS;
//已终止 终止状态。terminated方法调用完成以后的状态
private static final int TERMINATED = 3 << COUNT_BITS;
下面的几个方法用来操作上面的状态
//获取线程池运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取线程池中worker数量
private static int workerCountOf(int c) { return c & CAPACITY; }
线程池中的5个状态值依此递增,所有下面的几个方法可以快速判断出线程池状态是在那个之前或那个之后
是否在某个状态之前
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
是否在某个状态之后
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
是否运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
另外也提供3个方法用于cas增加或减少worker数量
//尝试cas+1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//尝试cas减1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
//cas减1,直至成功
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
另外,还有一些比较重要的成员变量
//阻塞队列
private final BlockingQueue<Runnable> workQueue;
//锁
private final ReentrantLock mainLock = new ReentrantLock();
//工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();
//终止条件
private final Condition termination = mainLock.newCondition();
//保存曾经拥有最大线程池容量
private int largestPoolSize;
//已完成的任务数
private long completedTaskCount;
//生产线程的工厂
private volatile ThreadFactory threadFactory;
//拒绝处理器
private volatile RejectedExecutionHandler handler;
//线程等待运行时间
private volatile long keepAliveTime;
//是否允许核心线程超时
private volatile boolean allowCoreThreadTimeOut;
//核心线程数
private volatile int corePoolSize;
//最大线程数
private volatile int maximumPoolSize;
//默认拒绝处理器
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
接下来再回到execute方法,逐行分析
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
调用workerCountOf(c)获取worker数量,如果小于核心线程数则进入if,里面再调用addWorker,如果成功
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker方法比较长,逐个分析
- addWorker接收两个参数,一个是待执行的线程,一个是是否是运行在核心线程
- 再往下走有个跳点retry,然后进入死循环
- 获取线程池当前状态rs
- 如果线程池状态是关闭之后(即SHUTDOWN或STOP或TIDYING或TERMINATED)
且线程池大于关闭或者待执行任务为null或者任务队列为空(!线程池除于关闭状态且待执行的线程为null且任务队列不为空),也就是线程池里面的线程都已执行完毕在关闭之后,返回false - 又是一个死循环,获取worker数量,如果worker数量大于等于最大容量或者worker数量大于等于核心线程数(当core参数为true,否则比较最大线程池数),则返回false
- 尝试cas增加worker数量,成功则跳出循环到最外围循环
- 如果状态改变,那么需要跳到最外围循环继续检查状态,如果没有改变则一直cas
跳出上面的那个外围循环后表示可以增加worker
- 声明两个变量
workerStarted:worker开始标识
workerAdded:worker被添加标识 - 通过待执行的任务创建一个Worder,内部会通过线程工厂一个线程
- 上锁,意味着一个一个线程的添加
- 获取线程池状态,如果线程池处于运行状态或者线程池处于关闭状态且任务为null,如果刚添加进来的进程还未启动就存活的话抛出 IllegalThreadStateException异常,表示线程非正常运行
- 将新创建的worker添加到worker集合
- 修改拥有的最大容量
- 标记worker已经添加到worker集合中
- 释放锁
- 如果上一步worker添加到集合成功,则运行线程,并标记线程已启动
- 如果线程没有启动则添加到失败的worker集合
- 返回线程是否启动标识
上面在创建Worder的时候会创建线程,也就是一个Worker包装了待执行的线程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
...
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
...
}
Worker本身实现了Runnable接口,firstTask持有了真正要执行的任务,thread持有从线程工厂创建出来的线程
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
可以看到newThread现场的Runnable就是Worker本身,所以在addWorker方法中执行的t.start方法运行的就是Worker中的run方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 获取当前线程(即worker线程),将执行任务保存到临时变量,将worker自身的任务引用置为null,以便复用,释放信号量,用于可中断
- 如果task不为null(直接submit任务进来的情况,任务执行完后,核心线程池里面的Worker线程的任务就有可能为null,所以先判断task是否为null),或者从阻塞队列中取任务,如果取不到则等待,取到后进入下面的if逻辑
- 上锁
- 如果线程池时被STOP之后的状态 且 线程没有被中断,则中断线程,即STOP或者之后的状态都会将运行的线程中断掉
- 在执行任务之前调用扩展方法,默认为空
- 运行真正要执行的任务
- 执行任务之后,不管成功失败,都调用执行完之后的扩展方法,默认也为空
- 增加Worker的任务完成数,释放锁
- 最后执行退出函数
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
9.1 如果线程没有执行完,意外的被打断或中断之类的那么将Worker的数量减1
9.2 上锁
9.3 将完成任务总数加1
9.4 将worker从worker容器先中移除
9.5 尝试终止一下
9.6 如果线程池处于STOP或者之前那么如果线程执行时没有被突然打断的那么先判断如果允许核心线程超时,那么将变量置为0,否则置为核心线程数
9.7 如果允许核心线程超时且worker队列不为空,那么至少要保持一个线程来执行worker队列中的任务
9.8 如果worker数量大于等于最小线程存活数那么直接返回(如果min为核心线程数,那么这时核心线程数已满,返回,如果允许核心线程超时且队列不为空,那么线程数至少要1个,否则0个即可)
9.9 添加一个没有执行任务的worker到worker队列中
接下来重新回到execute方法,继续往下走
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
- 如果线程池时运行状态且队列能加任务成功(核心线程数已满,需要加入阻塞队列,也就是execute中的addWorker(command, true)返回false)则进入if分支
- 再次校验线程池状态,如果线程池非运行,则将任务从队列中移除(如果突然线程池被其他线程置为非运行状态,那么刚刚放入队列中的任务有可能被执行,也有可能没有被执行,如果被执行了,则remove方法返回false,交由runWorker方法处理线程的状态,如果没有被执行,则remove方法返回true,那么就执行拒绝策略)
- 如果线程数为0,那么创建一个执行任务为空的worker取队列取任务(如果允许核心线程超时,那么有可能在添加此任务的时候核心线程数都在执行,那么放入队列成功后,核心线程都被清除掉,那么放入一个空worker,这个空的worker会从队列取任务去运行)
接下来是execute方法的最后一段逻辑
else if (!addWorker(command, false))
reject(command);
如果核心线程数已满,阻塞队列也满,那么调用addWorker,且已非核心线程的方法加入worker容器(addWorder会判断最大线程容量的阈值),如果addWorker不成功,那么说明线程池已满,执行拒绝策略
ThreadPoolExecutor的拒绝策略自带4种
- AbortPolicy:默认饱和策略,直接抛出一个RejectedExecutionException异常,让调用者自己处理
- DiscardPolicy:后续的任务都抛弃掉
- DiscardOldestPolicy:会将等待队列里最旧的任务踢走,让新任务得以执行
- CallerRunsPolicy:它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务,当前线程一般就是主线程,主线程运行任务,有可能会阻塞,推荐少用
进入reject方法看看
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
调用了handler的rejectedExecution方法
handler在ThreadPoolExecutor构造时默认使用AbortPolicy
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
AbortPolicy:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
//直接抛异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
//空实现,什么都不管
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
//将队尾的任务踢掉
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
CallerRunsPolicy:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
//主线程直接调用run方法运行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
当然也可以自己实现RejectedExecutionHandler接口来实现自己的拒绝策略
最后来分析它的状态转变
RUNNING:接受新任务并处理排队的任务
SHUTDOWN:不接受新任务,但处理排队的任务
STOP:不接受新任务,不处理排队的任务,并中断正在进行的任务
TIDYING:所有任务都已终止,workerCount为零,转换为状态TIDYING的线程将运行terminate()钩子方法
TERMINATED:terminate()已完成
这些值之间的数字顺序很重要,可以进行有序的比较。runState随着时间的推移单调增加,但不需要击中每个状态。 过渡是:
RUNNING -> SHUTDOWN:在调用shutdown()时,可能会隐含在finalize()中
(RUNNING or SHUTDOWN) -> STOP:在调用shutdownNow()时
SHUTDOWN -> TIDYING:当队列和池都是空的时候
STOP -> TIDYING:当池是空的时候
TIDYING -> TERMINATED:当terminate()钩子方法完成时
当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。
检测从SHUTDOWN到TIDYING的转换并不像你想的那样直接,因为在SHUTDOWN状态期间非空队列可能变空,反之亦然,但是我们只能在看到它为空时看到workerCount 是0
SHUTDOWN:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); //钩子方法,ScheduledThreadPoolExecutor中使用
} finally {
mainLock.unlock();
}
tryTerminate();
}
- 将状态设置为SHUTDOWN
- 中断空闲的Worker
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)//如果为true,那么只中断一个空闲的worker线程
break;
}
} finally {
mainLock.unlock();
}
}
循环检测所有worker,如果worker的里面的线程没有没被中断且尝试获得到worker的锁,那么中断这个线程(worker在执行任务的时候时lock的,这里用tryLock来尝试获取锁,如果获取不到锁说明worker还在运行)
3. 尝试终止
shutdownNow:立马关闭会强制中断所有worker线程
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
tryTerminate:在前面的processWorkerExit方法,shutdown方法时都会有调用tryTerminate的地方,tryTerminate方法可以用来尝试转成Terminate状态
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
- 如果线程池处于运行状态 或者 至少是TIDYING之后(TIDYING和TERMINATED) 或者 处于关闭状态且阻塞队列不为空(还有任务要执行),直接返回
- 如果worker数量不等于0,那么中断一个空闲的worker线程
- 上锁,将线程池状态设置为TIDYING,如果设置成功,则进入terminated,默认空方法,钩子函数
- 将线程池状态置为TERMINATED后唤醒所有termination条件上等待的线程