博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
活到老学到老,大牛总结JAVA线程池,纯源码分享指南
阅读量:4131 次
发布时间:2019-05-25

本文共 8563 字,大约阅读时间需要 28 分钟。

线程池的工作原理,以及拒绝策略,大家都很熟悉,下面主要讲一下线程池shutdown的原理,以及一些不常用操作的原理。

shutdown

public void shutdown() {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        checkShutdownAccess();        advanceRunState(SHUTDOWN);        interruptIdleWorkers();        onShutdown(); // hook for ScheduledThreadPoolExecutor    } finally {        mainLock.unlock();    }    tryTerminate();}复制代码

启动有序关闭,在该关闭中执行先前提交的任务,但不接受任何新任务。如果已关闭,则调用不会产生任何其他影响。此方法不等待先前提交的任务完成执行。使用awaitTermination可以做到这一点。

advanceRunState

private void advanceRunState(int targetState) {    for (;;) {        int c = ctl.get();        if (runStateAtLeast(c, targetState) ||            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))            break;    }}复制代码

将runState转换为给定状态,或者已经存在的状态比给定状态大时将直接返回。 循环使用CAS设置状态,设置成功返回。

interruptIdleWorkers

private void interruptIdleWorkers(boolean onlyOne) {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        for (Worker w : workers) {            Thread t = w.thread;            if (!t.isInterrupted() && w.tryLock()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                } finally {                    w.unlock();                }            }            if (onlyOne)                break;        }    } finally {        mainLock.unlock();    }}复制代码

因为work每次执行任务的时候都会先lock,完成任务后unlock, 如果tryLock可以成功说明work当前没有在执行任务。使用interrupt中断空闲的work线程。

tryTerminate

final void tryTerminate() {    for (;;) {        int c = ctl.get();        if (isRunning(c) ||            runStateAtLeast(c, TIDYING) ||            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))            return;        if (workerCountOf(c) != 0) { // Eligible to terminate            interruptIdleWorkers(ONLY_ONE);            return;        }        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {                try {                    terminated();                } finally {                    ctl.set(ctlOf(TERMINATED, 0));                    termination.signalAll();                }                return;            }        } finally {            mainLock.unlock();        }        // else retry on failed CAS    }}复制代码

shutdownNow

public List
shutdownNow() { List
tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks;}复制代码

尝试停止所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务的列表。从此方法返回后,这些任务将从任务队列中耗尽(删除)。此方法不等待主动执行的任务终止。除了尽最大努力尝试停止处理正在执行的任务之外,没有任何保证。此实现通过中断取消任务,因此任何无法响应中断的任务都可能永远不会终止。

interruptWorkers

private void interruptWorkers() {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        for (Worker w : workers)            w.interruptIfStarted();    } finally {        mainLock.unlock();    }}复制代码

中断所有线程,即使处于活动状态也是如此。

drainQueue

private List
drainQueue() { BlockingQueue
q = workQueue; ArrayList
taskList = new ArrayList
(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList;}复制代码

使用drainTo方法将任务队列写入新列表。但是,如果队列是DelayQueue或其他类型的队列,但poll或drainTo可能无法删除某些元素,则将它们逐个删除。

Worker

Worker主要维护运行任务线程的中断控制状态,以及其他次要簿记。此类扩展了AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁。我们实现了一个简单的非可重入互斥锁,而不是使用ReentrantLock,因为我们不希望辅助任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取该锁。另外,为了抑制直到线程真正开始运行任务之前的中断,我们将锁定状态初始化为负值,并在启动时将其清除(在runWorker中)。

runWorker

final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        while (task != null || (task = getTask()) != null) {            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                beforeExecute(wt, task);                Throwable thrown = null;                try {                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        processWorkerExit(w, completedAbruptly);    }}复制代码

Work反复从队列中获取任务并执行它们,同时解决许多问题:

  1. 我们可能从一个初始任务开始,在这种情况下,我们不需要第一个。否则,只要池是 运行时,我们从getTask获得任务。如果返回null,则 Work由于池状态或配置参数更改而退出。其他退出是由于引发异常 外部代码,在这种情况下,completedAbruptly为true,其通常导致processWorkerExit替换此线程。
  2. 在运行任何任务之前,先获取锁以防止在执行任务时其他池中断,然后我们确保除非池正在停止,否则此线程没有它的interrupt set。
  3. 在每次运行任务之前,都要调用beforeExecute,可能会引发异常,在这种情况下,我们导致线程死亡(中断循环,使用completelyAbruptly为true)无需处理 任务。
  4. 假设beforeExecute正常完成,我们运行 任务,收集其抛出的任何异常以发送给afterExecute。 我们分别处理RuntimeException,Error(两者 规范保证我们可以捕获)和任意Throwables。 因为我们无法在Runnable.run中抛出Throwables,所以我们 将它们包装在错误的出路(到线程的 UncaughtExceptionHandler)。 任何抛出的异常也保守地导致线程死亡。
  5. task.run完成后,我们调用afterExecute,这可能也会引发异常,这也会导致线程 死。 根据JLS Sec 14.20,此例外是即使task.run抛出也将有效。
  6. 异常机制的net效果是afterExecute和线程的UncaughtExceptionHandler具有相同的精度 我们可以提供的有关以下方面遇到的任何问题的信息用户代码。

processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted        decrementWorkerCount();    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        completedTaskCount += w.completedTasks;        workers.remove(w);    } finally {        mainLock.unlock();    }    tryTerminate();  // shutdown状态时,每个工作线程完成工作后,终止线程池    int c = ctl.get();    if (runStateLessThan(c, STOP)) {        if (!completedAbruptly) {            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;            if (min == 0 && ! workQueue.isEmpty())                min = 1;            if (workerCountOf(c) >= min)                return; // replacement not needed        }        addWorker(null, false);  // 工作线程执行任务异常退出时,重新启动一个工作线程来完成任务    }}复制代码

线程池的状态

线程池的控制状态ctl是一个atomic integer。表示workCount和runState两个字段。

workerCount指示有效线程数。int的后29位表示有效线程数。

runState,指示是否正在运行,正在关闭等。int的前三位表示线程池的状态。

  1. RUNNING: 接受新任务并处理排队的任务
  2. SHUTDOWN: 不接受新任务,而是处理排队的任务
  3. STOP: 不接受新任务,不处理排队任务以及中断进行中的任务
  4. TIDYING: 所有任务已终止,workerCount为零,线程转换为状态TIDYING将运行Terminated()挂钩方法
  5. TERMINATED: terminated()执行完成

运行状态切换

  • RUNNING -> SHUTDOWN: 在调用shutdown()时,可能隐式在finalize()中
  • (RUNNING or SHUTDOWN) -> STOP: 调用shutdownNow()
  • SHUTDOWN -> TIDYING: 队列和work pool为空
  • STOP -> TIDYING: work pool为空
  • TIDYING -> TERMINATED: terminated()执行完成

状态相关的一些代码

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) {    return c < s;}private static boolean runStateAtLeast(int c, int s) {    return c >= s;}private static boolean isRunning(int c) {    return c < SHUTDOWN;}private static final int COUNT_BITS = Integer.SIZE - 3;  // 29,11101private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 29位,全是1,值 536870911private static final int RUNNING    = -1 << COUNT_BITS;  // 32位,前三位是1, 值 -536870912private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 值 0private static final int STOP       =  1 << COUNT_BITS;  // 30位,第一位是1,值 536870912private static final int TIDYING    =  2 << COUNT_BITS;  // 31位,第一位是1,值 1073741824private static final int TERMINATED =  3 << COUNT_BITS;  // 31位,前两位是1, 值 1610612736-1  // 11111111111111111111111111111111, 32位0   // 01   // 12   // 103   // 11~CAPACITY // 32位,前三位是1,11100000000000000000000000000000,值-536870912

 

转载地址:http://vyfvi.baihongyu.com/

你可能感兴趣的文章
数据结构之队列、栈
查看>>
数据结构之树
查看>>
数据结构之二叉树
查看>>
二叉树非递归遍历算法思悟
查看>>
红黑树算法思悟
查看>>
从山寨Spring中学习Spring IOC原理-自动装配注解
查看>>
实例区别BeanFactory和FactoryBean
查看>>
Spring后置处理器BeanPostProcessor的应用
查看>>
Spring框架的ImportSelector到底可以干嘛
查看>>
Mysql中下划线问题
查看>>
微信小程序中使用npm过程中提示:npm WARN saveError ENOENT: no such file or directory
查看>>
Xcode 11 报错,提示libstdc++.6 缺失,解决方案
查看>>
idea的安装以及简单使用
查看>>
Windows mysql 安装
查看>>
python循环语句与C语言的区别
查看>>
Vue项目中使用img图片和background背景图的使用方法
查看>>
vue 项目中图片选择路径位置static 或 assets区别
查看>>
vue项目打包后无法运行报错空白页面
查看>>
Vue 解决部署到服务器后或者build之后Element UI图标不显示问题(404错误)
查看>>
element-ui全局自定义主题
查看>>