ThreadPoolExecutor原理

基本工作原理

下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型
notion image

线程池整体架构

ThreadPoolExecutor
notion image
notion image
在这里,有两个核心的类: ThreadPoolExector ScheduledThreadPoolExecutor ,后者不仅可以执行某个任务,还可以周期性地执行任务。 向线程池中提交的每个任务,都必须实现Runnable 接口,通过最上面的Executor 接口中的execute(Runnable command) 向线程池提交任务。 然后,在 ExecutorService 中,定义了线程池的关闭接口 shutdown() ,还定义了可以有返回值的任务,也就是Callable

核心数据结构

基于线程池的实现原理,下面看一下ThreadPoolExector的核心数据结构。
public class ThreadPoolExecutor extends AbstractExecutorService { //... private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 存放任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; // 对线程池内部各种变量进行互斥访问控制 private final ReentrantLock mainLock = new ReentrantLock(); // 线程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); //... }
每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // ... final Thread thread; // Worker封装的线程 Runnable firstTask; // Worker接收到的第1个任务 volatile long completedTasks; // Worker执行完毕的任务个数 // ... }
由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。

线程池参数

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
  1. corePoolSize 核心线程数目 (最多保留的线程数)
  1. maximumPoolSize 最大线程数目,在corePooSize已满、队列也满的情况下,扩充线程至此值
  1. keepAliveTime 生存时间 - 针对救急线程
  1. unit 时间单位 - 针对救急线程
  1. workQueue 阻塞队列
  1. threadFactory 线程创建工厂,可以自定义,有默认值 Executors.defaultThreadFactory()
  1. RejectedExecutionHandler corePoolSize已满,队列已满,maxPoolSize 已满,最后的绝策略。
工作方式
notion image
  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  1. 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
  1. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  1. 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。
  1. 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。

线程池状态

在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字段里面,即ctl变量。如下图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,这两个变量是分开存储的。
notion image
notion image
notion image
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING 这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
// c 为旧值, ctlOf 返回结果为新值 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))); // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们 private static int ctlOf(int rs, int wc) { return rs | wc; }
下面分析状态之间的迁移过程,如图所示:
notion image
线程池有两个关闭方法,shutdown()和shutdownNow(),这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING 状态;最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才真正关闭。
这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态。
除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }

关闭线程池

shutdown
/* *线程池状态变为 SHUTDOWN - 不会接收新任务 - *但已提交任务会执行完 - 此方法不会阻塞调用线程的执行 */ void shutdown();
实现
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(SHUTDOWN); // 仅会打断空闲线程 interruptIdleWorkers(); onShutdown(); // 扩展点 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等) tryTerminate(); }
shutdownNow
/* 线程池状态变为 STOP - 不会接收新任务 - 会将队列中的任务返回 - 并用 interrupt 的方式中断正在执行的任务 */ List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(STOP); // 打断所有线程 interruptWorkers(); // 获取队列中剩余任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试终结 tryTerminate(); return tasks;
其他打断方法
// 不在 RUNNING 状态的线程池,此方法就返回 true boolean isShutdown(); // 线程池状态是否是 TERMINATED boolean isTerminated(); // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事 情,可以利用此方法等待 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

关闭过程

关闭线程池的过程为:在调用 shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用 awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:
// executor.shutdownNow(); executor.shutdown(); try { boolean flag = true; do { flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS); } while (flag); } catch (InterruptedException e) { // ... }
awaitTermination()方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞一段时间,之后继续判断。
notion image

shutdown()与shutdownNow()的区别

  1. shutdown()不会清空任务队列,会等所有任务执行完成,shutdownNow()清空任务队列。
  1. shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程。
notion image
notion image
下面看一下在上面的代码里中断空闲线程和中断所有线程的区别。shutdown()方法中的interruptIdleWorkers()方法的实现:
notion image
notion image
关键区别点在tryLock():一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。tryLock()如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。
notion image
shutdownNow()调用了interruptWorkers()方法:
notion image
在上面的代码中,shutdown( ) shutdownNow( )都调用了tryTerminate( )方法,如下所示:
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // 当workQueue为空,wordCount为0时,执行下述代码。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将状态切换到到TIDYING状态 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); // 调用钩子函数 } finally { ctl.set(ctlOf(TERMINATED, 0)); // 将状态由TIDYING改为TERMINATED termination.signalAll(); // 通知awaitTermination(...) } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
tryTerminate()不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子方法terminated()
当钩子方法执行完成时,把状态从TIDYING 改为 TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。所以,TIDYING和TREMINATED的区别是在二者之间执行了一个钩子方法terminated(),目前是一个空实现。

任务的提交

//执行任务 void execute(Runnable command); // 提交任务 task,用返回值 Future 获得任务执行结果 <T> Future<T> submit(Callable<T> task); // 提交 tasks 中所有任务 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 提交 tasks 中所有任务,带超时时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
void execute(Runnable command)的源码分析
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果当前线程数小于corePoolSize,则启动新线程 if (workerCountOf(c) < corePoolSize) { // 添加Worker,并将command设置为Worker线程的第一个任务开始执行。 if (addWorker(command, true)) return; c = ctl.get(); } // 如果当前的线程数大于或等于corePoolSize,则调用workQueue.offer放入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果线程池正在停止,则将command任务从队列移除,并拒绝command任务请求。 if (! isRunning(recheck) && remove(command)) reject(command); // 放入队列中后发现没有线程执行任务,开启新线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 线程数大于maxPoolSize,并且队列已满,调用拒绝策略 else if (!addWorker(command, false)) reject(command); }
addWorker()方法源码
// 该方法用于启动新线程。如果第二个参数为true,则使用corePoolSize作为上限,否则使用 maxPoolSize作为上限。 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // 如果线程池状态值起码是SHUTDOWN和STOP,或则第一个任务不是null,或者工作队列为空 // 则添加worker失败,返回false if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { // 工作线程数达到上限,要么是corePoolSize要么是maximumPoolSize,启动线程失败 if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // 增加worker数量成功,返回到retry语句 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 如果线程池运行状态起码是SHUTDOWN,则重试retry标签语句,CAS if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } // worker数量加1成功后,接着运行: boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 新建worker对象 w = new Worker(firstTask); // 获取线程对象 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { // 由于线程已经在运行中,无法启动,抛异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将线程对应的worker加入worker集合 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } // 如果添加worker成功,则启动该worker对应的线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果启动新线程失败 if (! workerStarted) // workCount - 1 addWorkerFailed(w); } return workerStarted; }
在上面的任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。
下面来看Woker的run()方法的实现过程。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 当前Worker对象封装的线程 final Thread thread; // 线程需要运行的第一个任务。可以是null,如果是null,则线程从队列获取任务 Runnable firstTask; // 记录线程执行完成的任务数量,每个线程一个计数器 volatile long completedTasks; /** * 使用给定的第一个任务并利用线程工厂创建Worker实例 * @param firstTask 线程的第一个任务,如果没有,就设置为null,此时线程会从队列 获取任务。 */ Worker(Runnable firstTask) { setState(-1); // 线程处于阻塞状态,调用runWorker的时候中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 调用ThreadPoolExecutor的runWorker方法执行线程的运行 public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 中断Worker封装的线程 w.unlock(); boolean completedAbruptly = true; try { // 如果线程初始任务不是null,或者从队列获取的任务不是null,表示该线程应该执行任务。 while (task != null || (task = getTask()) != null) { // 获取线程锁 w.lock(); // 如果线程池停止了,确保线程被中断 // 如果线程池正在运行,确保线程不被中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 获取到任务后,再次检查线程池状态,如果发现线程池已经停止,则给自己发中断信号 wt.interrupt(); try { // 任务执行之前的钩子方法,实现为空 beforeExecute(wt, task); try { task.run(); // 任务执行结束后的钩子方法,实现为空 afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { // 任务执行完成,将task设置为null task = null; // 线程已完成的任务数加1 w.completedTasks++; // 释放线程锁 w.unlock(); } } // 判断线程是否是正常退出 completedAbruptly = false; } finally { // Worker退出 processWorkerExit(w, completedAbruptly); } }

shutdown()与任务执行过程综合分析

把任务的执行过程和上面的线程池的关闭过程结合起来进行分析,当调用 shutdown()的时候,可能出现以下几种场景:
  1. 当调用shutdown()的时候,所有线程都处于空闲状态。
    1. 这意味着任务队列一定是空的。此时,所有线程都会阻塞在 getTask()方法的地方。然后,所 有线程都会收到interruptIdleWorkers()发来的中断信号,getTask()返回null,所有Worker都会退出while循环,之后执行processWorkerExit。
  1. 当调用shutdown()的时候,所有线程都处于忙碌状态。
    1. 此时,队列可能是空的,也可能是非空的。interruptIdleWorkers()内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回null。之后,就和场景1一样了,退出while循环。
  1. 当调用shutdown()的时候,部分线程忙碌,部分线程空闲
    1. 有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在 getTask()方法的地方。空闲的这些线程会和场景1一样处理,不空闲的线程会和场景2一样处理。
 
getTask( )方法的内部细节:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // 如果线程池调用了shutdownNow(),返回null // 如果线程池调用了shutdown(),并且任务队列为空,也返回null if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { // 工作线程数减一 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果队列为空,就会阻塞pool或者take,前者有超时时间,后者没有超时时间 // 一旦中断,此处抛异常,对应上文场景1。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

shutdownNow() 与任务执行过程综合分析

和上面的 shutdown()类似,只是多了一个环节,即清空任务队列。如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程和中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。
当一个Worker最终退出的时候,会执行清理工作:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果线程正常退出,不会执行if的语句,这里一般是非正常退出,需要将worker数量减一 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 将自己的worker从集合移除 workers.remove(w); } finally { mainLock.unlock(); } // 每个线程在结束的时候都会调用该方法,看是否可以停止线程池 tryTerminate(); int c = ctl.get(); // 如果在线程退出前,发现线程池还没有关闭 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果线程池中没有其他线程了,并且任务队列非空 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果工作线程数大于min,表示队列中的任务可以由其他线程执行,退出当前线程 if (workerCountOf(c) >= min) return; // replacement not needed } // 如果当前线程退出前发现线程池没有结束,任务队列不是空的,也没有其他线程来执行 // 就再启动一个线程来处理。 addWorker(null, false); } }

拒绝策略

execute(Runnable command)的最后,调用了reject(command)执行拒绝策略,代码如下所示:
notion image
notion image
RejectedExecutionHandler 是一个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是AbortPolicy
package java.util.concurrent; public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
 
拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
JDK拒绝策略
notion image
  1. AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
    1. notion image
  1. CallerRunsPolicy 让调用者运行任务
    1. 调用者直接在自己的线程里执行,线程池不处理,比如到医院打点滴,医院没地方了,到你家自己操作吧:
      notion image
  1. DiscardPolicy 放弃本次任务
    1. 线程池直接丢掉任务
      notion image
  1. DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    1. notion image
 
第三方框架实现
  1. Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
  1. Netty 的实现,是创建一个新线程来执行任务
  1. ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
  1. PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略