Java并作的线程池ThreadPoolExecutor

线程池学习

以下有所内容及源码分析还是冲JDK1.8的,请知悉。

自我写博客就真比没有各个了,这说不定与自身之就学方法有关,我好呢看这样特别不好的,但是尚未办法说服自己失去改变,所以啊只能如此想到什么法呀了。


池化技术真正是同样流派在我看来非常牛逼的技术,因为其就了当片资源内实现了资源使的最大化,这吃自家想到了一致帮派科目,那即便是运筹学,当时以上运筹学的时节即便时举行这种接近的题材。


言归正传吧,我接下会展开相同不好线程池方面知识点的读书,也会见记录下来分享给大家。

线程池的内容中来关联到AQS同步器的知识点,如果对AQS同步器知识点感觉有些薄弱,可以错过押本身的直达一致首文章。

1、线程池介绍

当web开发被,服务器需要承受并处理要,所以会见吧一个伸手来分配一个线程来进展处理。如果每次要都新创办一个线程的话语实现起来挺便利,但是有一个题目:

比方出现的呼吁数量大多,但每个线程执行的时间特别缺乏,这样就会见反复的创始与销毁线程,如此一来会大大降低系统的效率。可能出现服务器在也每个请求创建新线程和销毁线程上费的时光与吃的系统资源要于拍卖实际的用户请求的年月以及资源再次多。

所以线程池就出现了。线程池为线程生命周期的支出和资源不足问题提供了解决方案。通过对多单任务重用线程,线程创建的开销为分派到了大半独任务及。

使用线程池的功利:

  • 下降资源消耗。通过还利用曾开立的线程降低线程创建和销毁造成的损耗。
  • 加强响应速度。当任务到时,任务可不需要的等到线程创建就能这实施。
  • 加强线程的可管理性。线程是稀缺资源,如果任界定的创建,不仅会吃系统资源,还会见下跌系统的风平浪静,使用线程池可以拓展合并的分红,调优和监察。

Java中之线程池是因此ThreadPoolExecutor类来实现之. 本文就结JDK
1.8对此类的源码来分析一下是看似中对线程的创办,
管理和后台任务的调度等方面的实践原理。

线程池的优势


既然说交线程池了,而且大多数底大牛也都见面提议我们运用池化技术来管理一些资源,那线程池肯定为是来其的利的,要不然怎么会那么出名并且为大家利用啊?

​ 我们便来探她到底发啊优势?

  • 资源可控性:使用线程池可以避创建大气线程而致使内存的吃

  • 增长响应速度:线程池地创建实际上是异常耗费时间和性质的,由线程池创建好出任务就运行,提升响应速度。

  • 便于管理:池化技术最好突出的一个风味就是足以帮忙我们针对池塘里的资源进行管理。由线程池统一分配和管理。

2、继承关系

我们先是来拘禁一下线程池的类图:

图片 1

线程池继承关系图

Executor接口

public interface Executor {
    /**
     * 在将来的某个时候执行传入的命令,执行命令可以在实现类里通过新创建的线程、线程池、当前线程来完成。
     */
    void execute(Runnable command);
}

ExecutorService接口

public interface ExecutorService extends Executor {

    /**
     * 启动先前提交的任务被执行的有序关闭,但不接受新的任务。 如果已经关闭,则调用没有其他影响。
     */
    void shutdown();

    /**
     * 尝试停止所有正在执行的任务,停止等待任务的处理,并返回正在等待执行的任务的列表。
     * 该方法不能等待之前提交的任务执行完,如果需要等待执行,可以使用{@link #awaitTermination awaitTermination}
     * 从这个方法返回后,这些任务从任务队列中排出(移除)。 除了竭尽全力地停止处理主动执行任务之外,没有任何保证。 
     */
    List<Runnable> shutdownNow();

    /**
     * 线程池有没有被关闭,关闭返回true,否则false
     */
    boolean isShutdown();

    /**
     * 如果所有任务在关闭后都完成了。返回true
     * 提示:如果没有在调用该方法前调用shutdown或者shutdownNow方法,此方法永远不会返回true
     */
    boolean isTerminated();

    /**
     * 在指定时间内阻塞等待任务全部完成,完成了返回true,否则false
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个有返回值的任务
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个任务来执行,返回一个有返回值的结果,返回值为传入的result
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个任务来执行,返回一个有返回值的结果,返回值为null
     */
    Future<?> submit(Runnable task);

    /**
     * 执行一批有返回值的任务
     * 返回的结果调用{@link Future#isDone}都是true
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    /**
     * 执行给定的任务,当全部完成或者超时返回一个有状态和结果的Future集合。
     * 返回的结果调用{@link Future#isDone}都是true
     * 返回时,尚未完成的任务将被取消。
     * 如果在进行此操作时修改了给定的集合,则此方法的结果是不确定的。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 执行给定的任务,返回一个成功完成任务的结果(即,没有抛出异常),
     * 如果有的话。 在正常或异常返回时,尚未完成的任务将被取消。 
     * 如果在进行此操作时修改了给定的集合,则此方法的结果是不确定的。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 执行给定的任务,返回一个成功完成任务的结果(即,没有抛出异常),
     * 如果有的话。 在正常或异常返回时,尚未完成的任务将被取消。 
     * 如果在进行此操作时修改了给定的集合,则此方法的结果是不确定的。
     * 超时没有成功结果抛出TimeoutException
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService接口

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    // ...
}

线程池的始建


我们设为此线程池来统一分配和管制我们的线程,那首先我们如果开创一个线程池出来,还是来不少大牛已经帮我们刻画好了诸多方的代码的,Executors的厂子方法就是吃咱们提供了创办多种不同线程池的方法。因为此类似才是一个创建对象的厂子,并无关系到充分多之实际实现,所以自己弗见面过度详细地去印证。

​ 老规矩,还是直接上代码吧。

public static ExecutorService newFixedThreadPool(int nThreads) {
      return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
  }

此间也即举出一个法的例子来进展事后的上书吧,我们得望,Executors只是个厂子而已,方法为就是来实例化不同的对象,实际上实例化出来的第一类即是ThreadPoolExecutor。现在咱们虽优先来简单地针对ThreadPoolExecutor构造函数内之每个参数进行解释一下吧。

  • corePoolSize(核心线程池大小):当提交一个任务及线程池时,线程池会创建一个线程来推行任务,即使其它空闲之着力线程能够实践新职责也会创线程,当任务数超过核心线程数的时节就是非会见重新创。在此地而留心一点,线程池刚创建的当儿,其中并从未创造任何线程,而是等职责来才去创造线程,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法
    ,这样才会优先创建好corePoolSize独线程或者一个线程。

  • maximumPoolSize(线程池最深线程数):线程池允许创建的无限酷线程数,如果队列满了,并且都创造的线程数仅次于最要命线程数,则线程池会再创新的线程执行任务。值得注意的凡,如果运用了无界队列,此参数就从不意义了。

  • keepAliveTime(线程活动保持时间):此参数默认在线程数大于corePoolSize的情状下才见面于作用,
    当线程的空闲时间上keepAliveTime的时节便会见停,直至线程数目小于corePoolSize。不过要调用了allowCoreThreadTimeOut道,则当线程数目小于corePoolSize的时呢会自作用.

  • unit(keelAliveTime的年月单位):keelAliveTime的岁月单位,一共发生7种,在此虽无列举了。

  • workQueue(阻塞队列):阻塞队列,用来囤等执行的天职,这个参数为是非常重要的,在这边大概介绍一下几乎独闭塞队列。

    • ArrayBlockingQueue:这是一个根据数组结构的有界阻塞队列,此行列按照FIFO的标准化对素进行排序。

    • LinkedBlockingQueue:一个因链表结构的围堵队列,此阵按照FIFO排序元素,吞吐量通常如果高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()即下了此队。

    • SynchronousQueue:一个勿存储元素的死队列。每个插入操作必须顶交另外一个线程调用移除操作,否则插入操作一直处在阻塞状态。吞吐量通常如果高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()哪怕采取了这队列。

    • PriorityBlockingQueue:一个怀有优先级的无阻塞队列。

  • handler(饱和策略);当线程池和班都充斥了,说明线程池已经处于饱和状态了,那么必须采取平等种政策来拍卖还在交过来的初职责。这个饱和策略默认情况下是AbortPolicy,表示无能为力处理新职责时抛来老。共有四种植饱满策略提供,当然我们为可择自己实现饱和策略。

    • AbortPolicy:直接抛并且抛出RejectedExecutionException异常

    • CallerRunsPolicy:只用调用者所在线程来运作任务。

    • DiscardOldestPolicy:丢弃队列里最近之一个任务,并推行当前任务。

    • DiscardPolicy:丢弃任务而不弃来深。

线程池的行流程便因此参考资料里的图介绍一下了,具体我们或通过代码去教授。
图片 2

以地方我们简要的任课了一晃Executors这厂类里的厂方法,并且讲述了瞬间创建线程池的一部分参数和她的用意,当然者的教并无是老深刻,因为想只要弄懂的语是内需持续地花时间错开押去领悟的,而博主自己也要尚未了将明白,不过博主的读书道是先模拟了个大体,再回头来瞧前面的知识点,可能会见越来越好掌握,所以我们就往下说吧。

3、ThreadPoolExecutor分析

想念如果深刻明ThreadPoolExecutor,就如优先亮其中最紧要的几乎个参数:

ThreadPoolExecutor源码分析


在上头我们就算意识了,Executors的厂方法主要就回到了ThreadPoolExecutor靶,至于另外一个以这里少不开口,也就是说,要学习线程池,其实重要的要么得学会分析ThreadPoolExecutor以此目标中的源码,我们连下就会见对ThreadPoolExecutor里的最主要代码进行解析。

3.1、核心变量和方式(状态转换)

// 状态|工作数的一个32bit的值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 0001-1111-1111-1111-1111-1111-1111-1111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 1110-0000-0000-0000-0000-0000-0000-0000
private static final int RUNNING    = -1 << COUNT_BITS;
// 0000-0000-0000-0000-0000-0000-0000-0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 0010-0000-0000-0000-0000-0000-0000-0000
private static final int STOP       =  1 << COUNT_BITS;
// 0100-0000-0000-0000-0000-0000-0000-0000
private static final int TIDYING    =  2 << COUNT_BITS;
// 0110-0000-0000-0000-0000-0000-0000-0000
private static final int TERMINATED =  3 << COUNT_BITS;

// ~CAPACITY就是前3位状态位,和c进行&就能得到当前的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 和c进行&就能得到当前的工作数
private static int workerCountOf(int c)  { return c & CAPACITY; }
// rs就是状态值,wc就是工作数,这两个进行或操作,就能得到ctl的值(32bit的值)
private static int ctlOf(int rs, int wc) { return rs | wc; }

或是过多口目上面的写法都蒙圈了。我骨子里基础呢不顶好,所以我见到此间的下索性写了个器类去测试他们之输出结果,如下:

public class ExecutorTest {
    private final static int COUNT_BITS = Integer.SIZE - 3;
    private final static int RUNNING    = -1 << COUNT_BITS;
    private final static int SHUTDOWN   =  0 << COUNT_BITS;
    private final static int STOP       =  1 << COUNT_BITS;
    private final static int TIDYING    =  2 << COUNT_BITS;
    private final static int TERMINATED =  3 << COUNT_BITS;
    private final static int CAPACITY   = (1 << COUNT_BITS) - 1;

    public static void main(String[] args) {
        System.out.println("状态位===");
        System.out.println(getFormatStr(RUNNING));
        System.out.println(getFormatStr(SHUTDOWN));
        System.out.println(getFormatStr(STOP));
        System.out.println(getFormatStr(TIDYING));
        System.out.println(getFormatStr(TERMINATED));
        System.out.println(getFormatStr(CAPACITY));
    }

    private static String getFormatStr(int n) {
        String integerMaxValueStr = Integer.toBinaryString(n);
        int a = 32;
        StringBuilder sb = new StringBuilder();
        int l = integerMaxValueStr.length();
        int i = 0;
        for (; a > 0; --a) {
            if (--l >= 0) {
                sb.append(integerMaxValueStr.charAt(l));
            } else {
                sb.append("0");
            }
            if (++i % 4 == 0) {
                if (a > 1) {
                    sb.append("-");
                }
                i = 0;
            }
        }
        return sb.reverse().toString();
    }
}

出口结果吧:

状态位===
1110-0000-0000-0000-0000-0000-0000-0000
0000-0000-0000-0000-0000-0000-0000-0000
0010-0000-0000-0000-0000-0000-0000-0000
0100-0000-0000-0000-0000-0000-0000-0000
0110-0000-0000-0000-0000-0000-0000-0000
0001-1111-1111-1111-1111-1111-1111-1111

透过地方的注释以及测试用例可以窥见,源码的撰稿人巧妙的利用一个价代表了2栽意思(前3bit位是状态,后29bit凡是工作数),下面我们来看看线程池最着重的5栽状态:

  1. RUNNING:能接受新交付的任务,并且也能处理阻塞队列中的天职;
  2. SHUTDOWN:关闭状态,不再接受新交付的任务,但却可以继续处理阻塞队列中曾经封存之任务。在线程池处于
    RUNNING 状态时,调用
    shutdown()方法会如线程池进入到该状态。(finalize()
    方法以实践过程被为会见调用shutdown()方法上该状态);
  3. STOP:不克领新职责,也未处理队列中的天职,会半途而废正在处理任务的线程。在线程池处于
    RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow()
    方法会要线程池进入及拖欠状态;
  4. TIDYING:如果持有的天职还早已停了,workerCount (有效线程数)
    为0,线程池进入该状态后会见调用 terminated() 方法上TERMINATED 状态。
  5. TERMINATED:在terminated()
    方法执行完后进该状态,默认terminated()方法吃什么啊从没做。

生图也线程池的状态转换过程:

图片 3

线程池状态转换图

AtomicInteger ctl

ctl凡最主要的主宰状态,是一个复合类型的变量,其中囊括了点儿独概念。

  • workerCount:表示中的线程数目

  • runState:线程池里线程的运行状态


俺们来分析一下以及ctl关于的片源代码吧,直接上代码

     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

     //用来表示线程池数量的位数,很明显是29,Integer.SIZE=32
     private static final int COUNT_BITS = Integer.SIZE - 3;
     //线程池最大数量,2^29 - 1
     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

     // runState is stored in the high-order bits
     //我们可以看出有5种runState状态,证明至少需要3位来表示runState状态
     //所以高三位就是表示runState了
     private static final int RUNNING    = -1 << COUNT_BITS;
     private static final int SHUTDOWN   =  0 << COUNT_BITS;
     private static final int STOP       =  1 << COUNT_BITS;
     private static final int TIDYING    =  2 << COUNT_BITS;
     private static final int TERMINATED =  3 << COUNT_BITS;

     // Packing and unpacking ctl
     private static int runStateOf(int c)     { return c & ~CAPACITY; }
     private static int workerCountOf(int c)  { return c & CAPACITY; }
     private static int ctlOf(int rs, int wc) { return rs | wc; }

     //用于存放线程任务的阻塞队列
     private final BlockingQueue<Runnable> workQueue;

     //重入锁
     private final ReentrantLock mainLock = new ReentrantLock();

     //线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问
     private final HashSet<Worker> workers = new HashSet<Worker>();

     //等待条件支持终止
     private final Condition termination = mainLock.newCondition();

     //创建新线程的线程工厂
     private volatile ThreadFactory threadFactory;

     //饱和策略
     private volatile RejectedExecutionHandler handler;
  1. CAPACITY

    以这边我们讲一下以此线程池最深数目之计吧,因为这边涉及到源码以及活动之类的操作,我感到大多数人数还还是无绝会是,因为我同开始看之时节呢是休顶会的。

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

从今代码我们可见到,是需要1往左移29位,然后又减去1,那个1往左移29位是怎么算的啊?

     1 << COUNT_BITS
      ​
      1的32位2进制是
      00000000 00000000 00000000 00000001
      ​
      左移29位的话就是
      00100000 00000000 00000000 00000000
      ​
      再进行减一的操作
      000 11111 11111111 11111111 11111111
      ​
      也就是说线程池最大数目就是
      000 11111 11111111 11111111 11111111

2.runState

赶巧数之原码、反码、补码都是一致的
于电脑底层,是用补码来表示的

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING    = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
  • RUNNING

    足承受新职责而处理既当死队列的任务
    赛3员不折不扣是1的语,就是RUNNING状态

-1 << COUNT_BITS

这里是-1往左移29位,稍微有点不一样,-1的话需要我们自己算出补码来
          ​
-1的原码
10000000 00000000 00000000 00000001
          ​
-1的反码,负数的反码是将原码除符号位以外全部取反
11111111 11111111 11111111 11111110
          ​
-1的补码,负数的补码就是将反码+1
11111111 11111111 11111111 11111111
          ​
关键了,往左移29位,所以高3位全是1就是RUNNING状态
111 00000 00000000 00000000 00000000
  • SHUTDOWN

    无接受新职责,但是处理就当堵塞队列的天职
    高3位全是0,就是SHUTDOWN状态

0 << COUNT_BITS
          ​
0的表示
00000000 00000000 00000000 00000000
          ​
往左移29位
00000000 00000000 00000000 00000000
  • STOP

    不接受新职责,也无处理阻塞队列里的职责,并且会中断正在处理的任务
    所以高3位是001,就是STOP状态

1 << COUNT_BITS
          ​
1的表示
00000000 00000000 00000000 00000001
          ​
往左移29位
00100000 00000000 00000000 00000000
  • TIDYING

    具备任务都给中断,workerCount是0,线程状态转化为TIDYING并且调用terminated()钩子方法
    所以高3位是010,就是TIDYING状态

2 << COUNT_BITS
          ​
2的32位2进制
00000000 00000000 00000000 00000010
          ​
往左移29位
01000000 00000000 00000000 00000000
  • TERMINATED

    terminated()钩子方法就就
    所以高3位是110,就是TERMINATED状态

3 << COUNT_BITS
          ​
3的32位2进制
00000000 00000000 00000000 00000011
          ​
往左移29位
11000000 00000000 00000000 00000000

3.局部计介绍

  • runStateOf(int c)

实时落runState的点子

private static int runStateOf(int c)     { return c & ~CAPACITY; }

~CAPACITY
~是按位取反的意思
&是按位与的意思
          ​
而CAPACITY是,高位3个0,低29位都是1,所以是
000 11111 11111111 11111111 11111111
          ​
取反的话就是
111 00000 00000000 00000000 00000000
          ​
传进来的c参数与取反的CAPACITY进行按位与操作
1、低位29个0进行按位与,还是29个0
2、高位3个1,既保持c参数的高3位
既高位保持原样,低29位都是0,这也就获得了线程池的运行状态runState
  • workerCountOf(int c)

获取线程池的脚下中线程数目

private static int workerCountOf(int c)  { return c & CAPACITY; }

CAPACITY的32位2进制是
000 11111 11111111 11111111 11111111
          ​
用入参c跟CAPACITY进行按位与操作
1、低29位都是1,所以保留c的低29位,也就是有效线程数
2、高3位都是0,所以c的高3位也是0
          ​
这样获取出来的便是workerCount的值
  • ctlOf(int rs, int wc)

    原子整型变量ctl的初始化方法

//结合这几句代码来看
private static final int RUNNING    = -1 << COUNT_BITS;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
          ​
private static int ctlOf(int rs, int wc) { return rs | wc; }

RUNNING是
111 00000 00000000 00000000 00000000
          ​
ctlOf是将rs和wc进行按位或的操作
          ​
初始化的时候是将RUNNING和0进行按位或
0的32位2进制是
00000000 00000000 00000000 00000000
          ​
所以初始化的ctl是
111 00000 00000000 00000000 00000000

3.2、构造方法

/**
 * @param corePoolSize 保留在线程池中的线程数,即使它们处于空闲状态,除非设置了{@code allowCoreThreadTimeOut}
 * @param maximumPoolSize 线程池中允许的最大线程数
 * @param keepAliveTime 当线程数大于corePoolSize时,这是多余空闲线程在终止之前等待新任务的最大时间。
 * @param unit {@code keepAliveTime}参数的时间单位
 * @param workQueue 在执行任务之前用于保存任务的队列。 这个队列将只保存{@code execute}方法提交的{@code Runnable}任务。
 * @param threadFactory 用来执行的时候创建线程的线程工厂
 * @param handler 在执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

对参数handler:线程池提供了4栽政策:

  1. AbortPolicy:直接抛来非常,这是默认策略;
  2. CallerRunsPolicy:用调用者所于的线程来施行任务;
  3. DiscardOldestPolicy:丢弃阻塞队列中因极前面的天职,并施行当前任务;
  4. DiscardPolicy:直接丢弃任务;

基本措施源码分析

  1. execute(Runnable command)方法

      public void execute(Runnable command) {
          //需要执行的任务command为空,抛出空指针异常
          if (command == null)  // 1
              throw new NullPointerException();

          /*
          *执行的流程实际上分为三步
          *1、如果运行的线程小于corePoolSize,以用户给定的Runable对象新开一个线程去执行
          *  并且执行addWorker方法会以原子性操作去检查runState和workerCount,以防止当返回false的
          *  时候添加了不应该添加的线程
          *2、 如果任务能够成功添加到队列当中,我们仍需要对添加的线程进行双重检查,有可能添加的线程在前
          *  一次检查时已经死亡,又或者在进入该方法的时候线程池关闭了。所以我们需要复查状态,并有有必
          *  要的话需要在停止时回滚入列操作,或者在没有线程的时候新开一个线程
          *3、如果任务无法入列,那我们需要尝试新增一个线程,如果新建线程失败了,我们就知道线程可能关闭了
          *  或者饱和了,就需要拒绝这个任务
          *
          */

          //获取线程池的控制状态
          int c = ctl.get();  // 2

          //通过workCountOf方法算workerCount值,小于corePoolSize
          if (workerCountOf(c) < corePoolSize) {
              //添加任务到worker集合当中
      if (addWorker(command, true)) 
                  return;  //成功返回
              //失败的话再次获取线程池的控制状态
              c = ctl.get();
          }

          /*
          *判断线程池是否正处于RUNNING状态
          *是的话添加Runnable对象到workQueue队列当中
          */
          if (isRunning(c) && workQueue.offer(command)) {  // 3

              //再次获取线程池的状态
              int recheck = ctl.get();

              //再次检查状态
              //线程池不处于RUNNING状态,将任务从workQueue队列中移除
              if (! isRunning(recheck) && remove(command))
                  //拒绝任务
                  reject(command);
              //workerCount等于0
              else if (workerCountOf(recheck) == 0)  // 4
                  //添加worker
                  addWorker(null, false);
          }
          //加入阻塞队列失败,则尝试以线程池最大线程数新开线程去执行该任务
      else if (!addWorker(command, false))  // 5 
              //执行失败则拒绝任务
              reject(command);
      }

我们来说一下者这个代码的流水线:

1、首先判断任务是否为空,空则抛来空指针异常
2、不呢空则获取线程池控制状态,判断小于corePoolSize,添加到worker集合当中实行,

  • 如成功,则返回
  • 失败的言语又跟着获取线程池控制状态,因为只有状态变了才会砸,所以重复取
    3、判断线程池是否处于运行状态,是的口舌虽添加command到死队列,加入时也会再次得到状态而检测

    状态是不是不处于运行状态,不处在的话语虽用command从绿灯队列移除,并且拒绝任务
    4、如果线程池里没有了线程,则创造新的线程去实施得阻塞队列的天职尽
    5、如果上述都不曾实施成功,则用敞开最酷线程池里的线程来实行任务,失败的言辞虽丢弃

偶更多的仿吗不如一个流程图来的明亮,所以还是画画了只execute的流程图给大家有利了解。
图片 4

2.addWorker(Runnable firstTask, boolean core)

      private boolean addWorker(Runnable firstTask, boolean core) {
          //外部循环标记
          retry:
          //外层死循环
          for (;;) {
              //获取线程池控制状态
              int c = ctl.get();
              //获取runState
              int rs = runStateOf(c);
      ​
              // Check if queue empty only if necessary.

              /**
              *1.如果线程池runState至少已经是SHUTDOWN
              *2\. 有一个是false则addWorker失败,看false的情况
              * - runState==SHUTDOWN,即状态已经大于SHUTDOWN了
              * - firstTask为null,即传进来的任务为空,结合上面就是runState是SHUTDOWN,但是
              *  firstTask不为空,代表线程池已经关闭了还在传任务进来
              * - 队列为空,既然任务已经为空,队列为空,就不需要往线程池添加任务了
              */
              if (rs >= SHUTDOWN &&  //runState大于等于SHUTDOWN,初始位RUNNING
                  ! (rs == SHUTDOWN &&  //runState等于SHUTDOWN
                     firstTask == null &&  //firstTask为null
                     ! workQueue.isEmpty()))  //workQueue队列不为空
                  return false;
      ​
              //内层死循环
              for (;;) {
                  //获取线程池的workerCount数量
                  int wc = workerCountOf(c);
                  //如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize
                  //返回false
                  if (wc >= CAPACITY ||
                      wc >= (core ? corePoolSize : maximumPoolSize))
                      return false;
                  //通过CAS操作,使workerCount数量+1,成功则跳出循环,回到retry标记
                  if (compareAndIncrementWorkerCount(c))
                      break retry;

                  //CAS操作失败,再次获取线程池的控制状态
                  c = ctl.get();  // Re-read ctl
                  //如果当前runState不等于刚开始获取的runState,则跳出内层循环,继续外层循环
                  if (runStateOf(c) != rs)
                      continue retry;
                  // else CAS failed due to workerCount change; retry inner loop
                  //CAS由于更改workerCount而失败,继续内层循环
              }
          }
      ​
          //通过以上循环,能执行到这是workerCount成功+1了

          //worker开始标记
          boolean workerStarted = false;
          //worker添加标记
          boolean workerAdded = false;
          //初始化worker为null
          Worker w = null;
          try {
              //初始化一个当前Runnable对象的worker对象
              w = new Worker(firstTask);
              //获取该worker对应的线程
              final Thread t = w.thread;
              //如果线程不为null
              if (t != null) {
                  //初始线程池的锁
                  final ReentrantLock mainLock = this.mainLock;
                  //获取锁
                  mainLock.lock();
                  try {
                      // Recheck while holding lock.
                      // Back out on ThreadFactory failure or if
                      // shut down before lock acquired.
                      //获取锁后再次检查,获取线程池runState
                      int rs = runStateOf(ctl.get());
      ​
                      //当runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask为null
                      if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {

                          //线程已存活
                          if (t.isAlive()) // precheck that t is startable
                              //线程未启动就存活,抛出IllegalThreadStateException异常
                              throw new IllegalThreadStateException();

                          //将worker对象添加到workers集合当中
                          workers.add(w);
                          //获取workers集合的大小
                          int s = workers.size();
                          //如果大小超过largestPoolSize
                          if (s > largestPoolSize)
                              //重新设置largestPoolSize
                              largestPoolSize = s;
                          //标记worker已经被添加
                          workerAdded = true;
                      }
                  } finally {
                      //释放锁
                      mainLock.unlock();
                  }
                  //如果worker添加成功
                  if (workerAdded) {
                      //启动线程
                      t.start();
                      //标记worker已经启动
                      workerStarted = true;
                  }
              }
          } finally {
              //如果worker没有启动成功
              if (! workerStarted)
                  //workerCount-1的操作
                  addWorkerFailed(w);
          }
          //返回worker是否启动的标记
          return workerStarted;
      }

咱俩也略说一下此代码的流水线吧,还当真是格外难之,博主写的下还停下了不少不成,想砸键盘的游说:

1、获取线程池的主宰状态,进行判定,不吻合则回false,符合则生一致步
2、死循环,判断workerCount是否过上限,或者高于corePoolSize/maximumPoolSize,没有底语虽然针对workerCount+1操作,
3、如果未符合上述判断或+1操作失败,再次获取线程池的主宰状态,获取runState与正开头得到的runState相比,不均等则跳出内层循环继续外层循环,否则继续内层循环
4、+1操作成后,使用重入锁ReentrantLock来管为workers当中加加worker实例,添加成功就启动该实例。

接下省流程图来明一下上面代码的一个实践流程
图片 5

3.addWorkerFailed(Worker w)

addWorker方法添加worker失败,并且没有水到渠成启动任务之时段,就见面调用此措施,将任务由workers中移除,并且workerCount做-1操作。

      private void addWorkerFailed(Worker w) {
          //重入锁
          final ReentrantLock mainLock = this.mainLock;
          //获取锁
          mainLock.lock();
          try {
              //如果worker不为null
              if (w != null)
                  //workers移除worker
                  workers.remove(w);
              //通过CAS操作,workerCount-1
              decrementWorkerCount();
              tryTerminate();
          } finally {
              //释放锁
              mainLock.unlock();
          }
      }

4.tryTerminate()

当对线程池执行了怪成功逻辑的操作时,都见面需要实行tryTerminate尝试终止线程池

      final void tryTerminate() {
          //死循环
          for (;;) {
              //获取线程池控制状态
              int c = ctl.get();

              /*
              *线程池处于RUNNING状态
              *线程池状态最小大于TIDYING
              *线程池==SHUTDOWN并且workQUeue不为空
              *直接return,不能终止
              */
              if (isRunning(c) ||
                  runStateAtLeast(c, TIDYING) ||
                  (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                  return;

              //如果workerCount不为0
              if (workerCountOf(c) != 0) { // Eligible to terminate
                  interruptIdleWorkers(ONLY_ONE);
                  return;
              }
      ​
              //获取线程池的锁
              final ReentrantLock mainLock = this.mainLock;
              //获取锁
              mainLock.lock();
              try {
                  //通过CAS操作,设置线程池状态为TIDYING
                  if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                      try {
                          terminated();
                      } finally {
                          //设置线程池的状态为TERMINATED
                          ctl.set(ctlOf(TERMINATED, 0));
                          //发送释放信号给在termination条件上等待的线程
                          termination.signalAll();
                      }
                      return;
                  }
              } finally {
                  //释放锁
                  mainLock.unlock();
              }
              // else retry on failed CAS
          }
      }

5.runWorker(Worker w)

该办法的来意就是去实施任务

final void runWorker(Worker w) {
      //获取当前线程
      Thread wt = Thread.currentThread();
      //获取worker里的任务
      Runnable task = w.firstTask;
      //将worker实例的任务赋值为null
      w.firstTask = null;

      /*
      *unlock方法会调用AQS的release方法
      *release方法会调用具体实现类也就是Worker的tryRelease方法
      *也就是将AQS状态置为0,允许中断
      */
      w.unlock(); // allow interrupts
      //是否突然完成
      boolean completedAbruptly = true;
      try {
          //worker实例的task不为空,或者通过getTask获取的不为空
          while (task != null || (task = getTask()) != null) {
              //获取锁
              w.lock();
              // If pool is stopping, ensure thread is interrupted;
              // if not, ensure thread is not interrupted.  This
              // requires a recheck in second case to deal with
              // shutdownNow race while clearing interrupt
              /*
              *获取线程池的控制状态,至少要大于STOP状态
              *如果状态不对,检查当前线程是否中断并清除中断状态,并且再次检查线程池状态是否大于STOP
              *如果上述满足,检查该对象是否处于中断状态,不清除中断标记
              */
              if ((runStateAtLeast(ctl.get(), STOP) ||
                   (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                  !wt.isInterrupted())
                  //中断改对象
                  wt.interrupt();
              try {
                  //执行前的方法,由子类具体实现
                  beforeExecute(wt, task);
                  Throwable thrown = null;
                  try {
                      //执行任务
                      task.run();
                  } catch (RuntimeException x) {
                      thrown = x; throw x;
                  } catch (Error x) {
                      thrown = x; throw x;
                  } catch (Throwable x) {
                      thrown = x; throw new Error(x);
                  } finally {
                      //执行完后调用的方法,也是由子类具体实现
                      afterExecute(task, thrown);
                  }
              } finally {//执行完后
                  //task设置为null
                  task = null;
                  //已完成任务数+1
                  w.completedTasks++;
                  //释放锁
                  w.unlock();
              }
          }
          completedAbruptly = false;
      } finally {
          //处理并退出当前worker
          processWorkerExit(w, completedAbruptly);
      }
  }

连通下我们为此文字来证明一下实行任务这点子的求实逻辑和流程。

  1. 首先在措施一致进来,就推行了w.unlock(),这是以将AQS的状态改也0,因为只有getState() >=
    0的时段,线程才足以让搁浅;
  2. 判断firstTask是否为空,为空则通过getTask()获取任务,不为空接着往生实施
  3. 判断是否符合中断状态,符合的言语设置中断标记
  4. 执行beforeExecute(),task.run(),afterExecute()方法
  5. 别一个来非常都见面导致任务尽之住;进入processWorkerExit来退出任务
  6. 健康履之话会接着回到步骤2

蹭一副简单的流程图:
图片 6

6.getTask()

当点的runWorker方法当中我们可见见,当firstTask为空的时候,会通过该办法来就获取任务去履行,那我们即便看看获取任务之方法到底是什么的?

      private Runnable getTask() {
          //标志是否获取任务超时
          boolean timedOut = false; // Did the last poll() time out?
      ​
          //死循环
          for (;;) {
              //获取线程池的控制状态
              int c = ctl.get();
              //获取线程池的runState
              int rs = runStateOf(c);
      ​
              // Check if queue empty only if necessary.
              /*
              *判断线程池的状态,出现以下两种情况
              *1、runState大于等于SHUTDOWN状态
              *2、runState大于等于STOP或者阻塞队列为空
              *将会通过CAS操作,进行workerCount-1并返回null
              */
              if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                  decrementWorkerCount();
                  return null;
              }
      ​
              //获取线程池的workerCount
              int wc = workerCountOf(c);
      ​
              // Are workers subject to culling?

              /*
              *allowCoreThreadTimeOut:是否允许core Thread超时,默认false
              *workerCount是否大于核心核心线程池
              */
              boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      ​
              /*
              *1、wc大于maximumPoolSize或者已超时
              *2、队列不为空时保证至少有一个任务
              */
              if ((wc > maximumPoolSize || (timed && timedOut))
                  && (wc > 1 || workQueue.isEmpty())) {
                  /*
                  *通过CAS操作,workerCount-1
                  *能进行-1操作,证明wc大于maximumPoolSize或者已经超时
                  */
                  if (compareAndDecrementWorkerCount(c))
                      //-1操作成功,返回null
                      return null;
                  //-1操作失败,继续循环
                  continue;
              }
      ​
              try {
                  /*
                  *wc大于核心线程池
                  *执行poll方法
                  *小于核心线程池
                  *执行take方法
                  */
                  Runnable r = timed ?
                      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                  workQueue.take();
                  //判断任务不为空返回任务
                  if (r != null)
                      return r;
                  //获取一段时间没有获取到,获取超时
                  timedOut = true;
              } catch (InterruptedException retry) {
                  timedOut = false;
              }
          }
      }

或文字说明一下面的代码逻辑与流程:

  1. 获取线程池控制状态及runState,判断线程池是否已经倒闭或在关闭,是的言辞则workerCount-1操作返回null
  2. 赢得workerCount判断是否过核心线程池
  3. 认清workerCount是否超出最老线程池数目或已过,是的话workerCount-1,-1遂则回null,不成功则赶回步骤1再次继续
  4. 判定workerCount是否超出核心线程池,大于则据此poll方法从队列获取任务,否则用take方法从队列获取任务
  5. 认清任务是否为空,不为空则返回获取的任务,否则回步骤1再度继续

搭下去还时有发生雷同副流程图:
图片 7

7.processWorkerExit

一目了然的,在执行任务中,会失去得任务进行实施,那既然是实践任务,肯定就是见面生实行了要出现异常中断执行的时刻,那这早晚势必啊会来相对应的操作,至于具体操作是哪些的,我们要一直去押源码最实在。

     private void processWorkerExit(Worker w, boolean completedAbruptly) {
          /*
          *completedAbruptly:在runWorker出现,代表是否突然完成的意思
          *也就是在执行任务过程当中出现异常,就会突然完成,传true
          *
          *如果是突然完成,需要通过CAS操作,workerCount-1
          *不是突然完成,则不需要-1,因为getTask方法当中已经-1
          *
          *下面的代码注释貌似与代码意思相反了
          */
          if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
              decrementWorkerCount();
      ​
          //生成重入锁
          final ReentrantLock mainLock = this.mainLock;
          //获取锁
          mainLock.lock();
          try {
              //线程池统计的完成任务数completedTaskCount加上worker当中完成的任务数
              completedTaskCount += w.completedTasks;
              //从HashSet<Worker>中移除
              workers.remove(w);
          } finally {
              //释放锁
              mainLock.unlock();
          }
      ​
          //因为上述操作是释放任务或线程,所以会判断线程池状态,尝试终止线程池
          tryTerminate();
      ​
          //获取线程池的控制状态
          int c = ctl.get();

          //判断runState是否小鱼STOP,即是RUNNING或者SHUTDOWN
          //如果是RUNNING或者SHUTDOWN,代表没有成功终止线程池
          if (runStateLessThan(c, STOP)) {
              /*
              *是否突然完成
              *如若不是,代表已经没有任务可获取完成,因为getTask当中是while循环
              */
              if (!completedAbruptly) {
                  /*
                  *allowCoreThreadTimeOut:是否允许core thread超时,默认false
                  *min-默认是corePoolSize
                  */
                  int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                  //允许core thread超时并且队列不为空
                  //min为0,即允许core thread超时,这样就不需要维护核心核心线程池了
                  //如果workQueue不为空,则至少保持一个线程存活
                  if (min == 0 && ! workQueue.isEmpty())
                      min = 1;
                  //如果workerCount大于min,则表示满足所需,可以直接返回
                  if (workerCountOf(c) >= min)
                      return; // replacement not needed
              }
              //如果是突然完成,添加一个空任务的worker线程--这里我也不太理解
              addWorker(null, false);
          }
      }
  1. 率先判断线程是否突然止住,如果是出人意料停,通过CAS,workerCount-1
  2. 统计线程池就任务数,并拿worker从workers当中移除
  3. 判定线程池状态,尝试终止线程池
  4. 线程池没有得逞已
    • 判断是否突然就任务,不是则进行下一样步,是虽然开展第三步
    • 设若允许核心线程超时,队列不呢空,则最少确保一个线程存活
    • 长一个缺损任务之worker线程

3.3、核心措施

execute方法

线程池最基本之法莫过于execute了,execute()方法用来交付任务,下面我们本着这方式看看该促成原理:

/**
 * 在未来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行
 * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * 执行分以下3步:
     *
     * 1. 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务
     *
     * 2. 如果线程入队成功,然后还是要进行double-check的,因为线程池在入队之后状态是可能会发生变化的
     *
     * 3. 如果无法将任务入队列(可能队列满了),需要新开一个线程
     * 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务。
     */
    int c = ctl.get();

    /**
     * 1、如果当前线程数少于corePoolSize,开启一个线程执行命令
     *(可能是由于addWorker()操作已经包含对线程池状态的判断,如此处没加,而入workQueue前加了)
     */
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;

        /**
         * 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get())
         * 失败的原因可能是:
         * 1、线程池已经shutdown,shutdown的线程池不再接收新任务
         * 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
         */
        c = ctl.get();
    }

    /**
     * 2、如果线程池RUNNING状态,且入队列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();//再次校验位

        //如果再次校验过程中,线程池不是RUNNING状态,并且remove(command)--workQueue.remove()成功,拒绝当前command
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 新建一个worker线程,没有指定firstTask,因为命令已经放入queue里了
            addWorker(null, false);
    }
    /**
     * 3、如果线程池不是running状态 或者 无法入队列
     *   尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
     */
    else if (!addWorker(command, false))
        reject(command);
}

以执行execute()方法时只要状态一直是RUNNING时,的实行过程如下:

  1. 设workerCount <
    corePoolSize,则创造并启动一个线程来执行新交付的职责;
  2. 使workerCount >=
    corePoolSize,且线程池内的堵截队列未满,则将任务添加到该阻塞队列中;
  3. 如果workerCount >= corePoolSize && workerCount <
    maximumPoolSize,且线程池内之梗塞队列已满,则开创并启动一个线程来施行新交付的任务;
  4. 假设workerCount >= maximumPoolSize,并且线程池内的短路队列已满,
    则因拒绝策略来拍卖该任务, 默认的处理方式是一直抛大。

addWorker方法

addWorker方法的重点办事是在线程池中创造一个初的线程并施行,firstTask参数
用于指定新增的线程执行的第一独任务。core为true表示在新增线程时会判断当前活动线程数是否少corePoolSize,false表示新增线程前急需看清时活动线程数是否少maximumPoolSize,代码如下:

/**
 * 检查是否可以针对当前的池状态和给定的界限(核心或最大值)添加新的工作者。相应地调整工人数量,并且如果可能的话,创建并开始新的工作者,运行firstTask作为其第一个任务。
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /**
         * 只有当下面两种情况会继续执行,其他直接返回false(添加失败)
         * 1、rs == RUNNING
         * 2、rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty() (执行了shutdown方法,但是阻塞队列还有任务没有执行)
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 当wc超过最大限制 || 如果是核心线程,超过了核心数,否则超过了最大线程数,直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                // count累加成功,直接跳出两层for循环,执行下面的逻辑
                break retry;

            /**
             * 能执行到这里,都是因为多线程竞争,只有两种情况
             * 1、workCount发生变化,compareAndIncrementWorkerCount失败,这种情况不需要重新获取ctl,继续for循环即可。
             * 2、runState发生变化,可能执行了shutdown或者shutdownNow,这种情况重新走retry,取得最新的ctl并判断状态。
             */
            c = ctl.get();  // 重新读取ctl,可能状态发生了变化
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 获取锁后重新检测runState,因为有可能shutdown了
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        // 线程不能是活跃状态
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;    //记录最大线程数
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 失败回退,从 wokers 移除 w, 线程数减一,尝试结束线程池(调用tryTerminate 方法)
            addWorkerFailed(w);
    }
    return workerStarted;
}

顾一下此的t.start()这个讲话,启动时见面调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象呢是一个线程。

Worker类

线程池中的诸一个线程被封装成一个Worker对象,ThreadPool维护的实际上就算是一致组Worker对象,看一下Worker的概念:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        // 还没有执行任务时,这时就不应该被中断,设置状态为-1
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        // 调用runWorker方法执行
        runWorker(this);
    }

    // Lock methods
    //
    // 0代表没有锁定状态
    // 1代表锁定状态

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker继承了AQS,使用AQS来落实独占锁的效能。为什么未行使ReentrantLock来促成呢?可以看出tryAcquire方法,它是休同意重入的,而ReentrantLock是允许重入的:

  1. lock方法而得到了独占锁,表示手上线程正在尽任务中;
  2. 要正在实行任务,则未应中断线程;
  3. 苟该线程现在无是独占锁的状态,也尽管是悠闲之状态,说明其从未当拍卖任务,这时可以本着该线程进行中断;
  4. 线程池在履行shutdown方法要tryTerminate方法时见面调用interruptIdleWorkers方法来刹车空闲的线程,interruptIdleWorkers方法会用tryLock方法来判定线程池中之线程是否是悠闲状态;
  5. 用设置为不可重入,是因我们无期待任务在调用像setCorePoolSize这样的线程池控制方法时再度赢得锁。如果运用ReentrantLock,它是可重入的,这样只要在任务中调用了如setCorePoolSize这类似线程池控制的法子,会搁浅正在运行的线程。

故此,Worker继承自AQS,用于判断线程是否空闲以及是否可以吃中断。

runWorker方法

于Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        // 如果task为空,则通过getTask来获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();   // 开始运行,不允许中断

            /**
             * 确保只有在线程STOP时,才会被设置中断标示,否则清除中断标示
             * 1、如果线程池状态>=STOP,且当前线程没有设置中断状态,wt.interrupt()
             * 2、如果一开始判断线程池状态<STOP,但Thread.interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程池状态是否>=STOP
             *  是,再次设置中断标示,wt.interrupt()
             *  否,不做操作,清除中断标示后进行后续步骤
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 用户自己实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 真正执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 用户自己实现
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // worker已经完成的任务数 + 1
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

/**
 * getTask方法用来从阻塞队列中取任务
 * 以下情况会返回null(被回收)
 * 1、超过了maximumPoolSize设置的线程数量(因为调用了setMaximumPoolSize())
 * 2、线程池被stop
 * 3、线程池被shutdown,并且workQueue空了
 * 4、线程等待任务超时
 * 返回null表示这个worker要结束了,这种情况下workerCount-1
 */
private Runnable getTask() {
    boolean timedOut = false; // 上一次poll()是否超时

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /**
         * 满足以下几点,wc - 1,返回null
         * 1、rs >= STOP
         * 2、rs == SHUTDOWN && workQueue.isEmpty()
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 队列获取值是否要阻塞等待
        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /**
         * 满足以下几点,wc - 1,返回null
         * 1、wc > maximumPoolSize
         * 2、1 < wc <= maximumPoolSize && timed && timedOut
         * 3、wc <= 1 && workQueue.isEmpty() && timed && timedOut
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 获取Runnable
            Runnable r = timed ?
                // 超时会被回收
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // 阻塞等待,默认设置最后最多会有corePoolSize的线程一起阻塞。
                // 如果设置allowCoreThreadTimeOut=true的话,最后所有线程都会被回收。
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

/**
 * @param completedAbruptly true:worker执行的时候异常了
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 1、worker数量-1
     * 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
     * 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
     */
    if (completedAbruptly)
        decrementWorkerCount();

    /**
     * 2、从Workers Set中移除worker
     */
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,也就表示着从线程池中移除了一个工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    /**
     * 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池
     * 主要是判断线程池是否满足终止的状态
     * 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
     * 没有线程了,更新状态为tidying->terminated
     */
    tryTerminate();

    /**
     * 4、是否需要增加worker线程
     * 线程池状态是running 或 shutdown
     * 如果当前线程是突然终止的,addWorker()
     * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
     * 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
     */
    int c = ctl.get();
    /**
     * 以下情况会增加一个worker addWorker(null, false);
     * 1、completedAbruptly == true
     * 2、completedAbruptly == false && allowCoreThreadTimeOut == true && wc < 1
     * 3、completedAbruptly == false && allowCoreThreadTimeOut == false && wc < corePoolSize
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

getTask重要之地方是亚只if判断,目的是控制线程池的管用线程数量。由臻文中的解析可以了解,在执行execute方法时,如果手上线程池的线程数量超越了corePoolSize且低于maximumPoolSize,并且workQueue已满时,则可增加工作线程,但这时如超时没有得到任务,也就是是timedOut为true的状况,说明workQueue已经也空了,也不怕证明了脚下线程池中无需那么多线程来执行任务了,可以拿多于corePoolSize数量之线程销毁掉,保持线程数量在corePoolSize即可。

processWorkerExit执行完毕以后,工作线程被灭绝,以上就是全体办事线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的办事线程,runWorker通过getTask获取任务,然后实施任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束

下是从execute到线程销毁之满贯流程图:

图片 8

execute执行图

Worker内部类


我们于方已算那个详细地摆了线程池执行任务execute的推行流程与一些细节,在地方往往地起了一个字,那便是worker实例,那么这worker究竟是呀也?里面都含了有些哟消息,以及worker这个职责究竟是怎么实施之吧?

​ 我们不怕于是局部来介绍一下咔嚓,还是直接上源码:

咱们得观看Worker内部近乎继承AQS同步器并且实现了Runnable接口,所以Worker很显就是是一个可实行任务而同时可控制中断、起及锁效果的好像。

  private final class Worker
          extends AbstractQueuedSynchronizer
          implements Runnable
      {
          /**
           * This class will never be serialized, but we provide a
           * serialVersionUID to suppress a javac warning.
           */
          private static final long serialVersionUID = 6138294804551838833L;
  ​
          /** 工作线程,如果工厂失败则为空. */
          final Thread thread;
          /** 初始化任务,有可能为空 */
          Runnable firstTask;
          /** 已完成的任务计数 */
          volatile long completedTasks;
  ​
          /**
           * 创建并初始化第一个任务,使用线程工厂来创建线程
           * 初始化有3步
           *1、设置AQS的同步状态为-1,表示该对象需要被唤醒
           *2、初始化第一个任务
           *3、调用ThreadFactory来使自身创建一个线程,并赋值给worker的成员变量thread
           */
          Worker(Runnable firstTask) {
              setState(-1); // inhibit interrupts until runWorker
              this.firstTask = firstTask;
              this.thread = getThreadFactory().newThread(this);
          }
  ​
    //重写Runnable的run方法
          /** Delegates main run loop to outer runWorker  */
          public void run() {
              //调用ThreadPoolExecutor的runWorker方法
              runWorker(this);
          }
  ​
          // Lock methods
          //
          // The value 0 represents the unlocked state.
          // The value 1 represents the locked state.
    //代表是否独占锁,0-非独占  1-独占
          protected boolean isHeldExclusively() {
              return getState() != 0;
          }

    //重写AQS的tryAcquire方法尝试获取锁
          protected boolean tryAcquire(int unused) {
           //尝试将AQS的同步状态从0改为1
              if (compareAndSetState(0, 1)) {
               //如果改变成,则将当前独占模式的线程设置为当前线程并返回true
                  setExclusiveOwnerThread(Thread.currentThread());
                  return true;
              }
              //否则返回false
              return false;
          }
  ​
    //重写AQS的tryRelease尝试释放锁
          protected boolean tryRelease(int unused) {
           //设置当前独占模式的线程为null
              setExclusiveOwnerThread(null);
              //设置AQS同步状态为0
              setState(0);
              //返回true
              return true;
          }
  ​
    //获取锁
          public void lock()        { acquire(1); }
          //尝试获取锁
          public boolean tryLock()  { return tryAcquire(1); }
          //释放锁
          public void unlock()      { release(1); }
          //是否被独占
          public boolean isLocked() { return isHeldExclusively(); }
  ​
          void interruptIfStarted() {
              Thread t;
              if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                  try {
                      t.interrupt();
                  } catch (SecurityException ignore) {
                  }
              }
          }
  }

3.4、其他外部调用方法

下面的主意还是用户可以协调进行调用的:

/**
 * 状态改为SHUTDOWN
 * 启动先前提交的任务被执行的有序关闭,但不接受新的任务。 如果已经关闭,则调用没有其他影响。
 * 该方法不能等待之前提交的任务执行完,如果需要等待执行,可以使用{@link #awaitTermination awaitTermination}
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

/**
 * 状态改为SHUTDOWN
 * 尝试停止所有正在执行的任务,停止等待任务的处理,并返回正在等待执行的任务的列表。 
 * 该方法不能等待之前提交的任务执行完,如果需要等待执行,可以使用{@link #awaitTermination awaitTermination}
 * 从这个方法返回后,这些任务从任务队列中排出(移除)。 除了竭尽全力地停止处理主动执行任务之外,没有任何保证。 
 * 这个实现通过{@link Thread#interrupt}来取消任务,所以任何不能响应中断的任务都不会终止。
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

// 执行任务前的hook
protected void beforeExecute(Thread t, Runnable r) { }

// 执行任务后的hook
protected void afterExecute(Runnable r, Throwable t) { }

/**
 * 什么都不做,交给子类实现,注意实现的时候使用super.terminated();
 */
protected void terminated() { }

/**
 * 判断状态 >= SHUTDOWN
 */
public boolean isShutdown() {
    return ! isRunning(ctl.get());
}

/**
 * 判断 SHUTDOWN <= 状态 < TERMINATED
 */
public boolean isTerminating() {
    int c = ctl.get();
    return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}

/**
 * 判断状态 == TERMINATED
 */
public boolean isTerminated() {
    return runStateAtLeast(ctl.get(), TERMINATED);
}

/**
 * 在指定的超时时间范围内等待状态变为TERMINATED
 */
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    /**
     * 1、当前workCount > 传入的corePoolSize,中断空闲worker
     * 2、传入的corePoolSize比之前的要大,选出差值和queue的大小做比较,比较小的作为要增加的线程数,调用addWorker,如果中途遇到workQueue为空,就不增加了。
     */
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

/**
 * 提前准备一个core的线程
 */
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);
}

/**
 * 提前准备所有的core线程
 */
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

// 设置coreThreadTimeOut的值
public void allowCoreThreadTimeOut(boolean value) {
    if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            interruptIdleWorkers();
    }
}

// 设置maximumPoolSize
public void setMaximumPoolSize(int maximumPoolSize) {
    if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    this.maximumPoolSize = maximumPoolSize;
    if (workerCountOf(ctl.get()) > maximumPoolSize)
        interruptIdleWorkers();
}

// 从队列里面移除任务
public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}

/**
 * 清除队列里所有呗cancel的Future类型的任务,此方法可用作存储回收操作
 * 该方法可能存在其他线程的干扰,导致清除失败
 */
public void purge() {
    final BlockingQueue<Runnable> q = workQueue;
    try {
        Iterator<Runnable> it = q.iterator();
        while (it.hasNext()) {
            Runnable r = it.next();
            if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                it.remove();
        }
    } catch (ConcurrentModificationException fallThrough) {
        // 如果在遍历期间遇到干扰,请采取慢速路径。进行遍历复制并调用remove取消条目。慢路径更可能是O(N * N)。
        for (Object r : q.toArray())
            if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                q.remove(r);
    }

    tryTerminate(); // In case SHUTDOWN and now empty
}

/**
 * 返回线程池大小
 */
public int getPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // runState == TIDYING 或者 runState == TERMINATED 返回0
        // 否则返回workers的大小
        return runStateAtLeast(ctl.get(), TIDYING) ? 0
            : workers.size();
    } finally {
        mainLock.unlock();
    }
}

/**
 * 获取活跃线程数:根据isLocked来判断是不是在执行任务
 */
public int getActiveCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        int n = 0;
        for (Worker w : workers)
            if (w.isLocked())
                ++n;
        return n;
    } finally {
        mainLock.unlock();
    }
}

/**
 * 返回最大线程池的大小
 */
public int getLargestPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        return largestPoolSize;
    } finally {
        mainLock.unlock();
    }
}

/**
 * 返回任务总数(包括已经完成的和未完成的)
 */
public long getTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers) {
            n += w.completedTasks;
            if (w.isLocked())
                ++n;
        }
        return n + workQueue.size();
    } finally {
        mainLock.unlock();
    }
}

/**
 * 返回已完成任务总数
 */
public long getCompletedTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers)
            n += w.completedTasks;
        return n;
    } finally {
        mainLock.unlock();
    }
}

小结

写这线程池就真的是未易于了,历时半独星期日,中途有众多之地方不懂得,而且《Java并作编程的点子》的就仍开中对线程池的介绍其实并无算是多,所以自己拘留起呢殊痛苦之,还常会扣押了是方式就是无晓怎么而调用这个和调用这个法子是出何用意。而且于马上上之进程中,有当怀疑自己之学道对怪,因为也有人跟自己说勿需同句子词去押去分析源码,只需要明白流程虽得了,但是后来要么考虑按照好的读书路线走,多读源码总是有补的,在此间我也为程序猿一些建议,有协调的求学方式的早晚,按照好的道坚定走下去。

3.5、内部方法和空方法

下的艺术都是用户自己调用不了之不二法门,这里也做一下征:

/**
 * 替换状态
 * 如果现在的ctl状态 >= targetState,什么都不做
 * 如果现在的ctl状态 < targetState,尝试替换状态
 */
private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            // 前3位替换,后29位保持ctl原来的数目
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

/**
 * 尝试终止,只有当以下几种情况才把状态改为TERMINATED
 *      1、SHUTDOWN状态 && queue是空的 && wc == 0
 *      2、STOP状态 && wc == 0
 * workCount如果不是0,这时候就中断其中一个idle的worker来传播关闭信号
 * 该方法必须在执行任何可能会终止的操作之后调用此方法 - 在关闭期间减少工作人员数量或从队列中删除任务。
 * ScheduledThreadPoolExecutor里面也用到了,所以这里修饰符没用private
 */
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // c是运行时的状态
        if (isRunning(c) ||
            // c的状态值 >= TIDYING
            runStateAtLeast(c, TIDYING) ||
            // c的状态是SHUTDOWN && 队列不是空
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;

        // worker数不是0
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 设置ctl的状态为TIDYING,为中间过渡状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 终止方法,空方法什么都不做,子类去实现
                    terminated();
                } finally {
                    // 设置ctl的状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

/**
 * 中断worker的空闲线程
 * @param onlyOne 是否仅仅中断worker中的第一个
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 尝试获取锁,这里只有当线程没有运行的时候才能够tryLock成功
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    // 设置worker线程的中断变量
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            // true,只中断队列的第一个就退出
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

/**
 * 中断所有worker的线程
 */
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

/**
 * 中断所有worker的空闲线程
 */
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

/**
 * 根据拒绝策略拒绝执行命令
 */
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

/**
 * 移除队列中的Runnable到一个新list中,使用的是阻塞队列的drainTo方法
 * 但是如果队列是DelayQueue或者其他能够让poll或者drainTo失败移除元素的队列的话,遍历队列并删除它
 */
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

/**
 * 预留方法,ScheduledThreadPoolExecutor重写了此方法
 */
void onShutdown() {
}

// ScheduledThreadPoolExecutor进行调用,判断是不是running或shutdown状态
final boolean isRunningOrShutdown(boolean shutdownOK) {
    int rs = runStateOf(ctl.get());
    return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}

// ScheduledThreadPoolExecutor进行调用,确认都提前准备好了
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

参考资料

方腾飞:《Java并作编程的办法》

如需转载,请务必注明出处,毕竟一块块搬砖也非是爱之事体。

4、线程池的督察

透过线程池提供的参数进行督查。线程池里产生一部分性质在监控线程池的时候可以采取

  1. getTaskCount:线程池已经施行的和不实行之天职总数;
  2. getCompletedTaskCount:线程池已到位的职责数,该值小于等于taskCount;
  3. getLargestPoolSize:线程池曾经创造了的绝酷线程数量。通过之数目好知道线程池是否满了,也尽管是达到了maximumPoolSize;
  4. getPoolSize:线程池当前的线程数量;
  5. getActiveCount:当前线程池中在尽任务的线程数量。

经过这些方法,可以对线程池进行督察,在ThreadPoolExecutor类中提供了几个缺损方法,如beforeExecute方法,afterExecute方法与terminated方法,可以扩大这些方式以执行前或者履后增加部分初的操作,例如统计线程池的履任务之工夫相当,可以持续自ThreadPoolExecutor来拓展扩展。

5、合理之配置线程池

如惦记合理的配置线程池,就不能不首先分析任务特点,可以打以下几独角度来拓展解析:

  1. 任务之习性:CPU密集型任务,IO密集型任务以及混合型任务。
  2. 职责之预级:高,中及小。
  3. 职责的尽时:长,中及缺失。
  4. 任务的凭:是否指其他系统资源,如数据库连接。

职责性质不一之天职可用不同层面之线程池分开处理。CPU密集型任务布置尽可能少的线程数量,如安排Ncpu+1单线程的线程池。IO密集型任务则是因为用等待IO操作,线程并无是一直在实行任务,则安排尽可能多的线程,如2*Ncpu。混合型的任务,如果得以拆分,则以那个拆分成一个CPU密集型任务与一个IO密集型任务,只要这有限独任务履行之辰去不是最最死,那么分解后实行的吞吐率要超越串行执行之吞吐率,如果当时片独任务执行时间相差太大,则并未必要展开诠释。我们得透过Runtime.getRuntime().availableProcessors()方法取得当前设备的CPU个数。

事先级不等之任务可行使优先级列PriorityBlockingQueue来处理。它可以让优先级赛之天职先得实施,需要注意的凡如直接闹先级赛之天职交到行列里,那么先级低之任务可能永远不能够实行。

履行时间各异的任务可以付出不同范畴之线程池来处理,或者为堪运用优先级列,让实施时不够的任务先实行。

借助数据库连接池的任务,因为线程提交SQL后待等数据库返回结果,如果等待的岁月越长CPU空闲时纵一发长,那么线程数应该安装更加充分,这样才能够重复好之采取CPU。

提议使用有界队列,有格队列会添系统的平稳以及预警能力,可以根据需要设大一点,比如几千。有同一潮我们组用的后台任务线程池的班和线程池全载了,不断的抛出抛弃任务的怪,通过排查发现是数据库出现了问题,导致执行SQL变得不行缓慢,因为后台任务线程池里的天职都是用为数据库查询与插数据的,所以导致线程池里的劳作线程全部死住,任务积压在线程池里。如果立刻我们设置成无界队列,线程池的阵就会见尤其多,有或会见支撑满内存,导致整体系不可用,而不就是后台任务出现问题。当然我们的系具有的职责是用的单独的服务器部署之,而我们用不同层面的线程池跑不同类型的职责,但是出现这么问题时也会潜移默化到外职责。

自己参考了:怎么合理地估算线程池大小?
这篇稿子里之使程序评估线程池大小。

6、结论

正文比较详细的剖析了线程池的做事流程,总体来说出如下几单内容:

  1. 剖析了线程的始建,任务的交付,状态的换与线程池的闭馆;
  2. 这边通过execute方法来展开线程池的行事流程,execute方法通过corePoolSize,maximumPoolSize以及阻塞队列的分寸来判断决定传入的天职应叫当下实施,还是应当长到死队列中,还是该驳回任务。
  3. 介绍了线程池关闭时的过程,也剖析了shutdown方法以及getTask方法是竞态条件;
  4. 每当获取任务时,要通过线程池的状态来判定该收工作线程还是闭塞线程等待新的任务,也说了怎么关闭线程池时假如中断工作线程以及为何各一个worker都待lock。

以为线程池提交任务时,除了execute方法,还有一个submit方法,submit方法会返回一个Future对象用于获取返回值,有关Future和Callable请自行了解一下系的文章,这里虽非介绍了。

7、扩展

诚如开发中core线程数量是蛮不便确定的,可以参照上面提到的安客观之估算线程池的轻重,但是一般还是开发者自己通过压测后取的多少,之后到真正的线程环境说明,得出一个成立之core数字。假设是5,但是以以防万一某些瞬时大流量(我们呢无法预知到底流量会发生多可怜),通常会又装一个比core线程数要格外之max线程,假设是10。那么当这种瞬时流量真的有了,如果欲服务器能够及早的增长处理速度,当然是需要让MAX线程尽快启动起来,帮着拍卖任务。这时候我们就算好团结扩展线程池。

8、参考

闲聊并发(三)Java线程池的解析以及利用
深刻明Java线程池:ThreadPoolExecutor
Java线程池ThreadPoolExecutor以与剖析(二) –
execute()原理

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图