coding……
但行好事 莫问前程

Java编程拾遗『线程池』

在之前的文章中,我们已经讲了很多Java线程的使用以及Java并发编程的原理,本篇文章,我们来重点看一下Java并发编程中一个比较常用的工具——线程池的使用以及源码实现,这也是Java面试的基本问题。

在使用线程池之前,我们可以尝试考虑这样一个问题,如果没有线程池,我们是怎样实现多线程编程的以及实现方式有哪些问题?首先,如果没有线程池,我们一般会直接通过new Thread()构造出多个线程,然后分别start开启,一般一个处理请求,对应一个线程。之前的文章我们也了解到,线程创建和回收是有消耗的,另外线程数目过多,也会增加系统资源的竞争,只要有竞争,那么不可避免的就会产生上下文切换,这个成本也是比较高的。如果不限制创建线程的数目,运行效率有可能会急剧下降,甚至会导致系统故障(比如OOM)。

解决方案也很直观,限制创建线程的数目,控制并发数量,如果有很多个任务处理,那么就去排队等待。而这种思想其实就是线程池的实现思路,那么使用线程池的好处就比较明显了:

  • 重用线程,避免线程重复创建的开销
  • 在任务过多时,通过排队避免创建过多线程,减少系统资源消耗和竞争,确保任务有序完成

Java并发包中线程池的实现类是ThreadPoolExecutor,它继承自AbstractExecutorService。

  • Executor接口:只定义了一个线程池提交任务的execute方法
  • ExecutorService接口:扩展了Executor接口,添加了操控线程池生命周期的方法,如shutDown(),shutDownNow()等。扩展了可异步执行任务返回值Future的方法,如submit()
  • AbstractExecutorService抽象类:实现了ExecutorService接口,并实现了部分ExecutorService接口的基础方法,作用跟之前将容器类时的Abstract***作用类似,方便实现自定义线程池(继承AbstractExecutorService抽象类,而不用直接实现ExecutorService接口)
  • ThreadPoolExecutor类:继承了AbstractExecutorService抽象类,并实现了ExecutorService中定义的AbstractExecutorService抽象类中未实现的方法

除了上述类图中展示的几个线程池的类之外,使用线程池,我们有可能还要关注如下几个类(接口):

  • FutureTask:Java中实现线程异步调用的类,在之前的文章线程的使用中讲过,可以用来包装Callable对象实例,异步获取线程执行结果
  • Callable:Java中实现线程的三种之一,很简单,之前的文章线程的使用中也介绍过
  • Executors:工具类,类中的方法都是静态方法,用于生成 ThreadPoolExecutor的实例的一些方法,比如newFixedThreadPool、newCachedThreadPool等
  • BlockingQueue:使用线程池,当核心线程数满了之后提交的任务就需要排队,这个用于排队的队列就是阻塞队列BlockingQueue,线程池中经常使用BlockingQueue的各种实现类,如ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等,不同的队列可以实现不同特性的线程池,这个下面再详细介绍。

1. 理解线程池

1.1 构造函数

ThreadPoolExecutor中有多个构造方法,都需要一些参数,主要构造方法有:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

对比上述四个构造函数,每个构造函数都有的参数包括corePoolSize、maximumPoolSize、keepAliveTime、unit及workQueue,前三个构造函数的实现都依赖了第四个构造函数,对于没有的参数threadFactory和handler,前三个构造函数会给定默认值。

参数corePoolSize, maximumPoolSize, keepAliveTime, unit用于控制线程池中线程的个数,workQueue表示任务队列,threadFactory用于对创建的线程进行一些配置,handler表示任务拒绝策略。

  • corePoolSize:核心线程个数
  • maximumPoolSize:最大线程个数
  • keepAliveTime和unit:空闲线程存活时间

corePoolSize表示线程池中的核心线程个数,不过,这并不是说,一开始就创建这么多线程,刚创建一个线程池后,实际上并不会创建任何线程。

一般情况下,有新任务到来的时候,如果当前线程个数小于corePoolSize,就会创建一个新线程来执行该任务。需要说明的是,即使其他线程现在也是空闲的,也会创建新线程

不过,如果线程个数大于等于corePoolSize,那就不会立即创建新线程了,它会先尝试排队,需要强调的是,它是”尝试”排队,而不是”阻塞等待”入队,如果队列满了或其他原因不能立即入队,它就不会排队,而是检查线程个数是否达到了maximumPoolSize,如果没有,就会继续创建线程,直到线程数达到maximumPoolSize

keepAliveTime的目的是为了释放多余的线程资源,它表示,当线程池中的线程个数大于corePoolSize时,额外空闲线程的存活时间,也就是说,一个非核心线程,在空闲等待新任务时,会有一个最长等待时间,即keepAliveTime,如果到了时间还是没有新任务,就会被终止。如果该值为0,表示所有线程都不会超时终止。

这几个参数除了可以在构造方法中进行指定外,还可以通过getter/setter方法进行查看和修改:

public void setCorePoolSize(int corePoolSize)
public int getCorePoolSize()
public int getMaximumPoolSize()
public void setMaximumPoolSize(int maximumPoolSize)
public long getKeepAliveTime(TimeUnit unit)
public void setKeepAliveTime(long time, TimeUnit unit)

除了这些静态参数,ThreadPoolExecutor还可以查看关于线程和任务数的一些动态数字:

//返回当前线程个数
public int getPoolSize()
//返回线程池曾经达到过的最大线程个数
public int getLargestPoolSize()
//返回线程池自创建以来所有已完成的任务数
public long getCompletedTaskCount()
//返回所有任务数,包括所有已完成的加上所有排队待执行的
public long getTaskCount()

线程个数小于等于corePoolSize时,我们称这些线程为核心线程,默认情况下:

  • 核心线程不会预先创建,只有当有任务时才会创建
  • 核心线程不会因为空闲而被终止,keepAliveTime参数不适用于它

不过,ThreadPoolExecutor有如下方法,可以改变这个默认行为:

//预先创建所有的核心线程
public int prestartAllCoreThreads()
//创建一个核心线程,如果所有核心线程都已创建,返回false
public boolean prestartCoreThread()
//如果参数为true,则keepAliveTime参数也适用于核心线程
public void allowCoreThreadTimeOut(boolean value)

大致来讲,线程池的工作流程可以如下图所示:

1.2 队列

ThreadPoolExecutor要求的队列类型是阻塞队列BlockingQueue,我们在之前的文章介绍过多种BlockingQueue,它们都可以用作线程池的队列,比如:

  • LinkedBlockingQueue:基于链表的阻塞队列,可以指定最大长度,但默认是无界的。
  • ArrayBlockingQueue:基于数组的有界阻塞队列
  • SynchronousQueue:没有实际存储空间的同步阻塞队列

如果用的是无界队列,需要强调的是,线程个数最多只能达到corePoolSize,到达corePoolSize后,新的任务总会排队,参数maximumPoolSize也就没有意义了

对于SynchronousQueue,它没有实际存储元素的空间,当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功,否则,总是会创建新线程,直到达到maximumPoolSize。

1.3 任务拒绝策略

如果队列有界,且maximumPoolSize有限,则当队列排满,线程个数也达到了maximumPoolSize,这时,新任务来了,如何处理呢?此时,会触发线程池的任务拒绝策略。

默认情况下,提交任务的方法如execute/submit/invokeAll等会抛出异常,类型为RejectedExecutionException。

不过,拒绝策略是可以自定义的,ThreadPoolExecutor实现了四种处理方式:

  • ThreadPoolExecutor.AbortPolicy:这就是默认的方式,抛出异常
  • ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛异常,也不执行
  • ThreadPoolExecutor.DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队
  • ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行

它们都是ThreadPoolExecutor的public静态内部类,都实现了RejectedExecutionHandler接口,这个接口的定义为:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

当线程池不能接受任务时,调用其拒绝策略的rejectedExecution方法。

拒绝策略可以在构造方法中进行指定,也可以通过如下方法进行指定:

public void setRejectedExecutionHandler(RejectedExecutionHandler handler)

默认的RejectedExecutionHandler是一个AbortPolicy实例,如下所示:

private static final RejectedExecutionHandler defaultHandler =     new AbortPolicy();

而AbortPolicy的rejectedExecution实现就是抛出异常,如下所示:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

需要强调下,拒绝策略只有在队列有界,且maximumPoolSize有限的情况下才会触发

如果队列无界,服务不了的任务总是会排队,但这不见得是期望的,因为请求处理队列可能会消耗非常大的内存,甚至引发内存不够的异常。如果队列有界但maximumPoolSize无限,可能会创建过多的线程,占满CPU和内存,使得任何任务都难以完成。

1.4 ThreadFactory

线程池可以接受一个参数,ThreadFactory,它是一个接口,定义为:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

这个接口根据Runnable创建一个Thread,ThreadPoolExecutor的默认实现是Executors类中的静态内部类DefaultThreadFactory,主要就是创建一个线程,给线程设置一个名称,设置daemon属性为false,设置线程优先级为标准默认优先级,线程名称的格式为: pool-<线程池编号>-thread-<线程编号>。

如果需要自定义一些线程的属性,比如名称,可以实现自定义的ThreadFactory。

1.5 Executors

Executors提供了一些静态工厂方法,可以方便的创建一些预配置的线程池,如下:

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newCachedThreadPool() 

上述三种线程池是最常用的三种线程池,下面来分别讲一下其实现及适用场景。

1.5.1 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

只有一个线程,使用无界队列LinkedBlockingQueue,线程创建后不会超时终止,该线程顺序执行所有任务。该线程池适用于需要确保所有任务被顺序执行的场合

1.5.2 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

使用固定数目的n个线程,使用无界队列LinkedBlockingQueue,线程创建后不会超时终止。和newSingleThreadExecutor一样,由于是无界队列,如果排队任务过多,可能会消耗非常大的内存

1.5.3 newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

它的corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,keepAliveTime是60秒,队列为SynchronousQueue。当新任务到来时,如果正好有空闲线程在等待任务,则其中一个空闲线程接受该任务,否则就总是创建一个新线程,创建的总线程个数不受限制,对任一空闲线程,如果60秒内没有新任务,就终止

1.5.4 使用场景

在系统负载很高的情况下,newFixedThreadPool可以通过队列对新任务排队,保证有足够的资源处理实际的任务,而newCachedThreadPool会为每个任务创建一个线程,导致创建过多的线程竞争CPU和内存资源,使得任何实际任务都难以完成,这时,newFixedThreadPool更为适用。

如果系统负载不太高,单个任务的执行时间也比较短,newCachedThreadPool的效率可能更高,因为任务可以不经排队,直接交给某一个空闲线程。

在系统负载可能极高的情况下,两者都不是好的选择,newFixedThreadPool的问题是队列过长,而newCachedThreadPool的问题是线程过多,这时,应根据具体情况自定义ThreadPoolExecutor,传递合适的参数。

2. 线程池使用示例

下面我们通过一个简单的示例,来展示一下Java中线程池的使用:

public class ThreadPoolTest {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(5));

        for (int i = 0; i < 15; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
                    executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }

    static class MyTask implements Runnable {
        private int taskNum;

        MyTask(int num) {
            this.taskNum = num;
        }

        @Override
        public void run() {
            System.out.println("正在执行task " + taskNum);
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task " + taskNum + "执行完毕");
        }
    }
}

自定义一个线程池,线程池corePoolSize为5,maximumPoolSize为10,任务队列采用有界阻塞队列ArrayBlockingQueue,size为5。所以按照之前的分析,前5个提交的任务会立即创建线程执行任务,之后提交的任务(6~10)会进入队列等待,当队列满了之后,由于线程数目5小于maximumPoolSize,所以之后提交的任务会创建新线程执行。

运行结果:

当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程,符合。上面程序中,将for循环中提交的任务改为超过15个,就会抛出任务拒绝异常。

3. 线程池的实现原理

线程池的核心就是上面构造函数讲解的那几个相关的参数,具体含义在上面已经介绍过了,这里不重复介绍了。这里我们来看一下除了上述属性之外的其它属性。

3.1 状态成员变量ctl

Java线程池中,采用一个 32 位的整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数。

// 线程池成员变量ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// COUNT_BITS设置为29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
private static final int COUNT_BITS = Integer.SIZE - 3;

// 000 11111111111111111111111111111
// 这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536860911
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池的状态存放在高3位中
// 运算结果为 111跟29个0:111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 将整数c的低29位修改为0,就得到了线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 将整数c的高3为修改为0,就得到了线程池中的线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }

// 通过线程池状态rs和线程数构造ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// 判断线程池是否出于RUNNING状态(小于SHUTDOWN)
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

上面就是线程池中对状态成员变量的几个位操作,这几个操作会在源码中一直出现,最好理解并熟记这几个方法及静态成员变量的含义,对我们更好地阅读源码很有帮助。

另外,从上述代码中,可以看出,线程池一共有状态5种状态,分别是:

  • RUNNING:正常的状态:接受新的任务,处理等待队列中的任务
  • SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
  • STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
  • TIDYING:所有的任务都销毁了,workCount为0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
  • TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个

RUNNING 定义为 -1,SHUTDOWN 定义为 0,其他的都比 0 大,所以等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也需要中断

线程池这几种状态的转换如下:

  • RUNNING -> SHUTDOWN:当调用了shutdown()后,会发生这个状态转换
  • (RUNNING or SHUTDOWN) -> STOP:当调用shutdownNow()后,会发生这个状态转换(shutDown()和shutDownNow()的区别)
  • SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN转换为TIDYING
  • STOP -> TIDYING:当任务队列清空后,发生这个转换
  • TIDYING -> TERMINATED:当terminated()方法结束后

3.2 工作线程Worker

线程池内部真正执行任务的线程叫Worker,是ThreadPoolExecutor的内部类。Worker类继承了AQS,同时实现了Runnable接口。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

    // 真正的线程,执行任务
    final Thread thread;
    
    // firstTask是在创建线程的时候指定的,如果firstTask非null,firstTask就是线程起来之后要执行的第一个任务
    // 如果firstTask为null,线程起来后,自己到任务队列中取任务(getTask方法)执行
    Runnable firstTask;
    
    // 用于存放此线程完全的任务数,注意了,这里用了 volatile,保证可见性
    volatile long completedTasks;

    // 构造方法,传入firstTask
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 调用ThreadFactory来创建一个新的线程
        this.thread = getThreadFactory().newThread(this);
    }

    // 这里调用了外部类的runWorker方法
    public void run() {
        runWorker(this);
    }

    //AQS操作,以独占锁,获取这个线程的执行权
}

Worker启动之后就可以执行任务或从任务队列取任务执行。

3.3 提交任务

介绍了ThreadPoolExecutor的基本成员后,来看一下线程池的核心——提交任务的实现及工作线程是如何工作的。这里我们通过execute方法作为示例:

public void execute(Runnable command) {
    //提交的任务command不允许为null
    if (command == null)
        throw new NullPointerException();
  
    //获取ctl成员变量的值
    int c = ctl.get();
  
    //如果当前线程数少于核心线程数,那么直接添加一个Worker来执行任务,
    //创建一个新的Worker线程,并把当前任务command作为这个线程的第一个任务(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        //addWorker返回true,添加任务成功,表示线程池已经接受了这个任务,这个方法就可以返回了
        //addWorker返回false,表示线程池不允许提交任务
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    //到这里,说明要么当前线程数大于等于核心线程数,要么上面addWorker失败了
    //如果线程池处于RUNNING状态,把这个任务添加到任务队列workQueue中
    if (isRunning(c) && workQueue.offer(command)) {
        /* 这里面说的是,如果任务进入了workQueue,我们是否需要开启新的线程
         * 因为线程数在[0, corePoolSize)是无条件开启新的线程
         * 如果线程数已经大于等于corePoolSize,那么将任务添加到队列中,然后进到这里
         */
        int recheck = ctl.get();
        // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池还是RUNNING状态,并且线程数为0,那么开启新的线程
        // 这块代码的真正意图是:避免任务提交到队列中了,但是线程都关闭了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果workQueue队列满了,那么进入到这个分支
    // 以maximumPoolSize为界创建新的Worker,
    // 如果失败,说明当前线程数已经达到maximumPoolSize,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

这段代码也比较清晰,跟我们认识的线程池的工作流程一致,下面来看一下addWorker是怎么工作的,比如:

  • addWorker会创建工作线程,那工作线程是如何启动的
  • 工作线程是如何从队列中去任务执行的
  • worker线程空闲超时回收是如何实现的

带着上面这些问题,继续来看代码:

/**
*第一个参数是准备提交给这个线程执行的任务,可以为null
*第二个参数为true代表使用核心线程数corePoolSize作为创建线程的界线,也就说创建这个线程的时候,
*    如果线程池中的线程总数已经达到corePoolSize,那么不能响应这次创建线程的请求
*    如果是false,代表使用最大线程数maximumPoolSize作为界线
*/
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //如果线程池已关闭,并满足以下条件之一,那么不创建新的worker:
        //1. 线程池状态大于SHUTDOWN,也就是STOP, TIDYING, 或TERMINATED
        //2. firstTask != null
        //3. workQueue.isEmpty()
        //简单分析下:
        //当线程池处于SHUTDOWN的时候,不允许提交任务,但是已有的任务继续执行
        //所以如果线程池处于SHUTDOWN,但是firstTask为null,且workQueue非空,那么是允许创建Worker的
        //当状态大于SHUTDOWN时,不允许提交任务(一旦大于SHUTDOWN,addWorker方法直接返false),且中断正在执行的任务
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //如果成功,那么就是所有创建线程前的条件校验都满足了,准备创建线程执行任务了
            //这里失败的话,说明有其他线程也在尝试往线程池中创建线程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //由于有并发,重新再读取一下 ctl
            c = ctl.get();
            //正常如果是CAS失败的话,进到下一个里层的for循环就可以了
            //可是如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池
            //那么需要回到外层的for循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /* 
     * 到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了,
     * 因为该校验的都校验了,至于以后会发生什么,那是以后的事,当前是满足条件的
     */
  
    //Worker是否已经启动
    boolean workerStarted = false;
    //是否已将这个Worker添加到workers这个HashSet中
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        //把firstTask传给worker的构造方法
        w = new Worker(firstTask);
        //取worker中的线程对象,就是Worker的构造方法调用ThreadFactory创建的新线程
        final Thread t = w.thread;
        if (t != null) {
            //这个是整个类的全局锁,关闭一个线程池需要这个锁,保证有线程持有锁的期间,线程池不会被关闭
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
                //小于SHUTTDOWN那就是RUNNING,这个自不必说,是最正常的情况
                //如果等于SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //worker里面的thread不能是已经启动的
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //加到 workers 这个 HashSet 中
                    workers.add(w);
                    int s = workers.size();
                    //largestPoolSize 用于记录 workers 中的个数的最大值
                    //因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //添加成功的话,启动这个线程
            if (workerAdded) {
                //启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果线程没有启动,需要做一些清理工作,如前面workCount加了1,将其减掉
        if (! workerStarted)
            addWorkerFailed(w);
    }
    //返回线程是否启动成功
    return workerStarted;
}

这里我们来看第一个问题,addWorker是如何添加工作线程以及工作线程是如何启动的?

addWorker方法中调用了Worker类的构造函数创建Worker对象,Worker构造函数中通过ThreadFactory创造了工作线程,并切赋值给Worker的成员变量thread(就是通过该thread执行任务并拉取队列中的任务执行的)。如果工作线程添加成功并且成功添加到workers的HashSet中,则将Worker对象的成员变量thread调用start方法启动线程(之后就能通过Worker的成员变量thread执行提交的任务了)

接下来看一下Worker对象中的thread启动之后是如何执行任务及拉取任务的。我们知道Thread对象的start方法,其实是执行的Runnable的run方法,所以Worker的成员变量thread对象调用start方法之后,其实执行的是如下方法:

/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}

来看一下runWorker方法的实现:

/**
* 此方法由worker线程启动后调用,这里用一个while循环来不断地从等待队列中获取任务并执行
* worker在初始化的时候,如果指定了firstTask,那么第一个任务也就可以不需要从队列中获取
*/
final void runWorker(Worker w) {
    // 
    Thread wt = Thread.currentThread();
    //该线程的第一个任务(如果有的话)
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); //allow interrupts
    boolean completedAbruptly = true;
    try {
        //循环调用getTask获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();          
            //如果线程池状态大于等于STOP,那么意味着该线程也要中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //这是一个钩子方法,留给需要的子类实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //到这里终于可以执行任务了
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    //这里不允许抛出Throwable,所以转换为 Error
                    thrown = x; throw new Error(x);
                } finally {
                    //也是一个钩子方法,将task和异常作为参数,留给需要的子类实现
                    afterExecute(task, thrown);
                }
            } finally {
                // 置空task,准备getTask获取下一个任务
                task = null;
                //累加完成的任务数
                w.completedTasks++;
                //释放掉worker的独占锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 如果到这里,需要执行线程关闭:
        // 1. 说明 getTask返回null,也就是说,这个worker的使命结束了,执行关闭
        // 2. 任务执行过程中发生了异常
        // 第一种情况,已经在代码处理了将workCount减1,这个在getTask方法中再介绍
        // 第二种情况,workCount没有进行处理,所以需要在processWorkerExit中处理
        processWorkerExit(w, completedAbruptly);
    }
}

可以看到,当Worker实例对象的thread成员调用start方法启动后,首次执行Worker初始化时指定的fistTask,之后就在循环获取队列中的任务执行,这也是上面第二个问题的答案,工作线程是如何执行等待队列中的任务的。

这里我们单独讲一下runWorker方法中的两个钩子方法beforeExecute、afterExecute:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

如果我们想在任务执行前后做些类似于监控的动作,就可以可以覆盖ThreadPoolExecutor上述两个方法,加入我们的监控逻辑,这样就可以在任务执行前后实施监控任务了。

最后我们来看一下getTask方法是如何从阻塞队列中获取任务的:

// 此方法有三种可能:
// 1. 阻塞直到获取到任务返回。我们知道,默认corePoolSize之内的线程是不会被回收的,它们会一直等待任务
// 2. 超时退出。keepAliveTime起作用的时候,也就是如果这么多时间内都没有任务,返回null
// 3. 如果发生了以下条件,此方法必须返回null:
//    - 池中有大于maximumPoolSize个workers存在(通过调用setMaximumPoolSize进行设置)
//    - 线程池处于SHUTDOWN,而且workQueue是空的,这种不再接受新的任务
//    - 线程池处于STOP,不仅不接受新的线程,连workQueue中的线程也不再执行
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
  
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 两种可能
        // 1. rs == SHUTDOWN && workQueue.isEmpty()
        // 2. rs >= STOP
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // CAS操作,减少工作线程数
            decrementWorkerCount();
            return null;
        }
        boolean timed;      // Are workers subject to culling?
        for (;;) {
            int wc = workerCountOf(c);
            //允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            //下面这个if,如果为true并执行break,表示线程不需要被回收
            /**
            * 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了
            * 很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize
            * 2. timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了
            * 以上两点满足其一,下面的if都不成立,不会执行break,会执行下面第二个if CAS减少工作线程,就可以触发线程回收了
            */
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            //下面这个if CAS减少工作线程数返回true,表示线程可以被回收了,getTask方法返回null
            //runWorker方法中就会回收工作线程
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            // compareAndDecrementWorkerCount(c) 失败,线程池中的线程数发生了改变
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
        // wc <= maximumPoolSize 同时没有超时
        try {
            //如果timed为true(超时就回收线程),阻塞超时获取任务,否则阻塞获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果此worker发生了中断,采取的方案是重试
            // 如果开发者通过setMaximumPoolSize方法将maximumPoolSize调小了,导致其小于当前的workers数量,
            // 那么意味着超出的部分线程要被关闭。重新进入for循环,自然会有部分线程会返回null
            timedOut = false;
        }
    }
}

通过上面getTask方法的分析,我们可以解答第三个问题,worker线程空闲超时回收是如何实现的?

线程池中可维持corePoolSize数量的常驻核心线程,超过corePoolSize的线程会在空闲超时时间后被回收。corePoolSize范围内的线程从workQueue队列中获取任务时,会阻塞式地获取任务(take方法),如果没有获取任务,那么就会一直阻塞下去,而超过corePoolSize范围内的线程从workQueue队列中获取任务时,会阻塞超时获取任务(pool方法),如果超时还没获取到任务,getTask方法会返回null,表示当前Worker的使命已经完成了,需要回收Worker的线程。

4. 线程池使用的注意事项

4.1 线程池创建方式

上面讲到工具类Executors类提供了很多线程池创建的方法,比如newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool等,我们可以通过Executors类提供的各种方法创建特定线程池。但是阿里巴巴开发规范中有下面一条关于线程池的规范:

原因也很好解释,FixedThreadPool和SingleThreadPool构造函数中,阻塞队列使用的是LinkedBlockingQueue,并且没有指定阻塞队列长度,那么阻塞队列默认是无界的,也就是讲任务总能提交成功,任务可以无限提交,那么大量任务提交后,可能会导致OOM。CachedThreadPool和ScheduledThreadPool构造函数中,maxPoolSize设置为Integer.MAX_VALUE,那么如果阻塞队列满了之后,如果提交任务就可以无限制创建工作线程,也会导致OOM

我觉得,Executors提供的线程池构造方法,并不是一定就不能使用,只不过我们使用前要考虑清楚任务的提交频度及提交量,做好安全相关的考虑,如果不会出现OOM,那么也是可以使用的。如果要完全规避OOM这种问题,我们就可以通过自定义线程池来实现,也就是调用ThreadPoolExecutor的构造函数来自己创建线程池,比如我们创建一个阻塞队列有界的固定数量线程池:

private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(100));

但是这种方式也不是完全没有问题,因为如果阻塞队列满了之后再提交的任务,讲被拒绝执行,如果使用使用Executors提供的newFixedThreadPool构造,是可以保证任务一定被执行的(当然前提是没有发生OOM)。

4.2 线程池死锁

提交给线程池的任务,我们需要特别注意一种情况,就是任务之间有依赖,这种情况可能会出现死锁。比如任务A,在它的执行过程中,它给提交了一个任务B,但需要等待任务B结束。

如果任务A是提交给了一个单线程线程池,就会出现死锁,A在等待B的结果,而B在队列中等待被调度。

如果是提交给了一个限定线程个数的线程池,也有可能出现死锁,看个简单的例子:

public class ThreadPoolDeadLockDemo {
    private static final int THREAD_NUM = 5;
    static ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);

    static class TaskA implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Future<?> future = executor.submit(new TaskB());
            try {
                future.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("finished task A");
        }
    }

    static class TaskB implements Runnable {
        @Override
        public void run() {
            System.out.println("finished task B");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            executor.execute(new TaskA());
        }
        Thread.sleep(2000);
        executor.shutdown();
    }
}

使用newFixedThreadPool创建了一个5个线程的线程池,main程序提交了5个TaskA,TaskA会提交一个TaskB,然后等待TaskB结束,而TaskB由于线程已被占满只能排队等待,这样,程序就会死锁。

死锁问题也是可以解决的,比如替换newFixedThreadPool为newCachedThreadPool,让创建线程不再受限,这个问题就没有了。

另一个解决方法,是使用SynchronousQueue,它可以避免死锁,怎么做到的呢?对于普通队列,入队只是把任务放到了队列中,而对于SynchronousQueue来说,入队成功就意味着已有线程接受处理,如果入队失败,可以创建更多线程直到maximumPoolSize,如果达到了maximumPoolSize,会触发拒绝机制,不管怎么样,都不会死锁。我们将创建executor的代码替换为:

static ExecutorService executor = new ThreadPoolExecutor(
        THREAD_NUM, THREAD_NUM, 0, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());

只是更改队列类型,运行同样的程序,程序不会死锁,不过TaskA的submit调用会抛出异常RejectedExecutionException,因为入队会失败,而线程个数也达到了最大值。

参考链接:

1. Java API

2. 《Java编程的逻辑》

3. 深度解读 java 线程池设计思想及源码实现

4. 从源码的角度解析线程池运行原理

赞(2) 打赏
Zhuoli's Blog » Java编程拾遗『线程池』
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址