基本工作原理
下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型
线程池整体架构
ThreadPoolExecutor
在这里,有两个核心的类:
ThreadPoolExector
和ScheduledThreadPoolExecutor
,后者不仅可以执行某个任务,还可以周期性地执行任务。
向线程池中提交的每个任务,都必须实现Runnable 接口,通过最上面的Executor 接口中的execute(Runnable command)
向线程池提交任务。
然后,在 ExecutorService 中,定义了线程池的关闭接口 shutdown() ,还定义了可以有返回值的任务,也就是Callable核心数据结构
基于线程池的实现原理,下面看一下ThreadPoolExector的核心数据结构。
public class ThreadPoolExecutor extends AbstractExecutorService { //... private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 存放任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; // 对线程池内部各种变量进行互斥访问控制 private final ReentrantLock mainLock = new ReentrantLock(); // 线程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); //... }
每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // ... final Thread thread; // Worker封装的线程 Runnable firstTask; // Worker接收到的第1个任务 volatile long completedTasks; // Worker执行完毕的任务个数 // ... }
由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。
线程池参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize 核心线程数目 (最多保留的线程数)
- maximumPoolSize 最大线程数目,在corePooSize已满、队列也满的情况下,扩充线程至此值
- keepAliveTime 生存时间 - 针对救急线程
- unit 时间单位 - 针对救急线程
- workQueue 阻塞队列
- threadFactory 线程创建工厂,可以自定义,有默认值
Executors.defaultThreadFactory()
。
- RejectedExecutionHandler corePoolSize已满,队列已满,maxPoolSize 已满,最后的绝策略。
工作方式
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
- 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
- 如果线程到达
maximumPoolSize
仍然有新任务这时会执行拒绝策略。
- 当高峰过去后,超过
corePoolSize
的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。
线程池状态
在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字段里面,即ctl变量。如下图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,这两个变量是分开存储的。
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
// c 为旧值, ctlOf 返回结果为新值 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))); // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们 private static int ctlOf(int rs, int wc) { return rs | wc; }
下面分析状态之间的迁移过程,如图所示:
线程池有两个关闭方法,shutdown()和shutdownNow(),这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING 状态;最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才真正关闭。
这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态。
除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
关闭线程池
shutdown
/* *线程池状态变为 SHUTDOWN - 不会接收新任务 - *但已提交任务会执行完 - 此方法不会阻塞调用线程的执行 */ void shutdown();
实现
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(SHUTDOWN); // 仅会打断空闲线程 interruptIdleWorkers(); onShutdown(); // 扩展点 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等) tryTerminate(); }
shutdownNow
/* 线程池状态变为 STOP - 不会接收新任务 - 会将队列中的任务返回 - 并用 interrupt 的方式中断正在执行的任务 */ List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(STOP); // 打断所有线程 interruptWorkers(); // 获取队列中剩余任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试终结 tryTerminate(); return tasks;
其他打断方法
// 不在 RUNNING 状态的线程池,此方法就返回 true boolean isShutdown(); // 线程池状态是否是 TERMINATED boolean isTerminated(); // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事 情,可以利用此方法等待 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
关闭过程
关闭线程池的过程为:在调用 shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用 awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:
// executor.shutdownNow(); executor.shutdown(); try { boolean flag = true; do { flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS); } while (flag); } catch (InterruptedException e) { // ... }
awaitTermination()
方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞一段时间,之后继续判断。shutdown()与shutdownNow()的区别
- shutdown()不会清空任务队列,会等所有任务执行完成,shutdownNow()清空任务队列。
- shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程。
下面看一下在上面的代码里中断空闲线程和中断所有线程的区别。shutdown()方法中的
interruptIdleWorkers()
方法的实现:关键区别点在tryLock():一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。
tryLock()
如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。shutdownNow()调用了
interruptWorkers()
方法:在上面的代码中,
shutdown( )
和 shutdownNow( )
都调用了tryTerminate( )
方法,如下所示:final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // 当workQueue为空,wordCount为0时,执行下述代码。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将状态切换到到TIDYING状态 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); // 调用钩子函数 } finally { ctl.set(ctlOf(TERMINATED, 0)); // 将状态由TIDYING改为TERMINATED termination.signalAll(); // 通知awaitTermination(...) } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
tryTerminate()
不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子方法terminated()
。当钩子方法执行完成时,把状态从TIDYING 改为 TERMINATED,接着调用
termination.sinaglAll()
,通知前面阻塞在awaitTermination的所有调用者线程。所以,TIDYING和TREMINATED的区别是在二者之间执行了一个钩子方法terminated()
,目前是一个空实现。任务的提交
//执行任务 void execute(Runnable command); // 提交任务 task,用返回值 Future 获得任务执行结果 <T> Future<T> submit(Callable<T> task); // 提交 tasks 中所有任务 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 提交 tasks 中所有任务,带超时时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
void execute(Runnable command)
的源码分析public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果当前线程数小于corePoolSize,则启动新线程 if (workerCountOf(c) < corePoolSize) { // 添加Worker,并将command设置为Worker线程的第一个任务开始执行。 if (addWorker(command, true)) return; c = ctl.get(); } // 如果当前的线程数大于或等于corePoolSize,则调用workQueue.offer放入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果线程池正在停止,则将command任务从队列移除,并拒绝command任务请求。 if (! isRunning(recheck) && remove(command)) reject(command); // 放入队列中后发现没有线程执行任务,开启新线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 线程数大于maxPoolSize,并且队列已满,调用拒绝策略 else if (!addWorker(command, false)) reject(command); }
addWorker()方法源码
// 该方法用于启动新线程。如果第二个参数为true,则使用corePoolSize作为上限,否则使用 maxPoolSize作为上限。 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // 如果线程池状态值起码是SHUTDOWN和STOP,或则第一个任务不是null,或者工作队列为空 // 则添加worker失败,返回false if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { // 工作线程数达到上限,要么是corePoolSize要么是maximumPoolSize,启动线程失败 if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // 增加worker数量成功,返回到retry语句 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 如果线程池运行状态起码是SHUTDOWN,则重试retry标签语句,CAS if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } // worker数量加1成功后,接着运行: boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 新建worker对象 w = new Worker(firstTask); // 获取线程对象 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { // 由于线程已经在运行中,无法启动,抛异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将线程对应的worker加入worker集合 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } // 如果添加worker成功,则启动该worker对应的线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果启动新线程失败 if (! workerStarted) // workCount - 1 addWorkerFailed(w); } return workerStarted; }
在上面的任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。
下面来看Woker的run()方法的实现过程。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 当前Worker对象封装的线程 final Thread thread; // 线程需要运行的第一个任务。可以是null,如果是null,则线程从队列获取任务 Runnable firstTask; // 记录线程执行完成的任务数量,每个线程一个计数器 volatile long completedTasks; /** * 使用给定的第一个任务并利用线程工厂创建Worker实例 * @param firstTask 线程的第一个任务,如果没有,就设置为null,此时线程会从队列 获取任务。 */ Worker(Runnable firstTask) { setState(-1); // 线程处于阻塞状态,调用runWorker的时候中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 调用ThreadPoolExecutor的runWorker方法执行线程的运行 public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 中断Worker封装的线程 w.unlock(); boolean completedAbruptly = true; try { // 如果线程初始任务不是null,或者从队列获取的任务不是null,表示该线程应该执行任务。 while (task != null || (task = getTask()) != null) { // 获取线程锁 w.lock(); // 如果线程池停止了,确保线程被中断 // 如果线程池正在运行,确保线程不被中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 获取到任务后,再次检查线程池状态,如果发现线程池已经停止,则给自己发中断信号 wt.interrupt(); try { // 任务执行之前的钩子方法,实现为空 beforeExecute(wt, task); try { task.run(); // 任务执行结束后的钩子方法,实现为空 afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { // 任务执行完成,将task设置为null task = null; // 线程已完成的任务数加1 w.completedTasks++; // 释放线程锁 w.unlock(); } } // 判断线程是否是正常退出 completedAbruptly = false; } finally { // Worker退出 processWorkerExit(w, completedAbruptly); } }
shutdown()与任务执行过程综合分析
把任务的执行过程和上面的线程池的关闭过程结合起来进行分析,当调用 shutdown()的时候,可能出现以下几种场景:
- 当调用shutdown()的时候,所有线程都处于空闲状态。
这意味着任务队列一定是空的。此时,所有线程都会阻塞在 getTask()方法的地方。然后,所
有线程都会收到interruptIdleWorkers()发来的中断信号,getTask()返回null,所有Worker都会退出while循环,之后执行processWorkerExit。
- 当调用shutdown()的时候,所有线程都处于忙碌状态。
此时,队列可能是空的,也可能是非空的。
interruptIdleWorkers()
内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回null。之后,就和场景1一样了,退出while循环。- 当调用shutdown()的时候,部分线程忙碌,部分线程空闲
有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在 getTask()方法的地方。空闲的这些线程会和场景1一样处理,不空闲的线程会和场景2一样处理。
getTask( )
方法的内部细节:private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // 如果线程池调用了shutdownNow(),返回null // 如果线程池调用了shutdown(),并且任务队列为空,也返回null if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { // 工作线程数减一 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果队列为空,就会阻塞pool或者take,前者有超时时间,后者没有超时时间 // 一旦中断,此处抛异常,对应上文场景1。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
shutdownNow() 与任务执行过程综合分析
和上面的 shutdown()类似,只是多了一个环节,即清空任务队列。如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程和中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。
当一个Worker最终退出的时候,会执行清理工作:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果线程正常退出,不会执行if的语句,这里一般是非正常退出,需要将worker数量减一 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 将自己的worker从集合移除 workers.remove(w); } finally { mainLock.unlock(); } // 每个线程在结束的时候都会调用该方法,看是否可以停止线程池 tryTerminate(); int c = ctl.get(); // 如果在线程退出前,发现线程池还没有关闭 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果线程池中没有其他线程了,并且任务队列非空 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果工作线程数大于min,表示队列中的任务可以由其他线程执行,退出当前线程 if (workerCountOf(c) >= min) return; // replacement not needed } // 如果当前线程退出前发现线程池没有结束,任务队列不是空的,也没有其他线程来执行 // 就再启动一个线程来处理。 addWorker(null, false); } }
拒绝策略
在
execute(Runnable command)
的最后,调用了reject(command)执行拒绝策略,代码如下所示:RejectedExecutionHandler 是一个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是
AbortPolicy
。package java.util.concurrent; public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
JDK拒绝策略
- AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy 让调用者运行任务
调用者直接在自己的线程里执行,线程池不处理,比如到医院打点滴,医院没地方了,到你家自己操作吧:
- DiscardPolicy 放弃本次任务
线程池直接丢掉任务
- DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
第三方框架实现
- Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略