醋醋百科网

Good Luck To You!

Java线程池真的那么难懂吗?看完这篇你就全明白了!

线程池到底是什么神仙操作

在开发中,为了提升效率,我们需要将一些业务采用多线程的方式去执行,但是,如果每次异步操作或者多线程操作都需要新创建一个线程,使用完毕后,线程再被销毁,这样的话,对系统造成一些额外的开销。在处理过程中到底由多线程处理了多少个任务,以及每个线程的开销无法统计和管理,所以咱们需要一个线程池机制来管理这些内容。线程池的概念和连接池类似,都是在一个Java的集合中存储大量的线程对象,每次需要执行异步操作或者多线程操作时,不需要重新创建线程,直接从集合中拿到线程对象直接执行方法就可以了。JDK提供了多种创建线程池的方式,实际中常使用ThreadPoolExecutor来自定义构建线程池。以下内容涉及ThreadPoolExecutor的源码分析,可能有点长,还请耐心看完。

ThreadPoolExecutor应用实战

创建线程池可不是随便new一下就完事了!看一下ThreadPoolExecutor提供的七个核心参数

public ThreadPoolExecutor(
    int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁)
    int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程)
    long keepAliveTime,  // 非核心工作线程在阻塞队列位置等待的时间
    TimeUnit unit,      // 非核心工作线程在阻塞队列位置等待时间的单位
    BlockingQueue<Runnable> workQueue, // 任务在没有核心工作线程处理时,任务先扔到阻塞队列中
    ThreadFactory threadFactory,  // 构建线程的线程工作,可以设置thread的一些信息,比如线程名前缀
    RejectedExecutionHandler handler) {  // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略
}

其中的拒绝策略,当线程池无法处理任务时,jdk提供了以下几种策略:

  • AbortPolicy 直接抛异常
  • CallerRunsPolicy 将任务交给调用者处理
  • DiscardPolicy 什么也不做(即任务被丢弃)
  • DiscardOldestPolicy 将队列中最早的任务丢弃掉,将当前任务再次尝试交给线程池处理
  • 自定义策略实现RejectedExecutionHandler

ThreadPoolExecutor源码解析

ThreadPoolExecutor核心属性

// 核心属性,高3位表示线程池的状态,低29位表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 声明了一个常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00000000 00000000 00000000 00000001
// 1左移29位
// 0010_0000_0000_0000_0000_0000_0000_0000
// 1左移29位后减一,低29位全为1,
// 0001_1111_1111_1111_1111_1111_1111_1111
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 线程池的状态表示
// 111...0000:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS;
// 000...000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001...000:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管
private static final int STOP       =  1 << COUNT_BITS;
// 010...000: 代表TIDYING状态,这个状态是SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态
private static final int TIDYING    =  2 << COUNT_BITS;
// 011...000: 代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法
private static final int TERMINATED =  3 << COUNT_BITS;
// Packing and unpacking ctl
// 基于&运算的特点,保证只会拿到ctl高三位的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的状态的特点及转换方式

ThreadPoolExecutor的execute方法


public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取核心属性ctl
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 工作的线程数小于核心线程数, 添加核心工作线程
        if (addWorker(command, true))
            return;
        // 说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次ctl
        c = ctl.get();
    }
    //添加核心工作线程失败,走这里的逻辑
    // 判断线程池状态是否是RUNNING,如果是,正常基于阻塞队列的offer方法,将任务添加到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        //如果任务在扔到阻塞队列之前,线程池状态突然改变了,这里重新获取一下状态二次校验
        int recheck = ctl.get();
        // 如果线程池的状态不是RUNNING,将任务从阻塞队列移除
        if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 这里阻塞队列中已经将任务加进去了
            // 如果工作线程为0个,需要添加一个非核心工作线程去处理阻塞队列中的任务
            addWorker(null, false);
    }
    // 任务添加到阻塞队列失败,添加非核心线程
    else if (!addWorker(command, false))
        // 添加非核心线程失败,执行拒绝策略
        reject(command);
}

提交任务至线程池的整个逻辑总结如下:

addWorker(Runnable firstTask, boolean core)源码解析:

addWorker方法主要分为两大块:

  • 第一块,校验线程池状态及工作线程个数
  • 第二块,添加工作线程并启动工作线程

先看校验线程池状态及工作线程个数

private boolean addWorker(Runnable firstTask, boolean core) {
    // retry是给外层for循环添加一个标记,是为了方便在内层for循坏跳出外层for循环
    retry:
    // 外层for循环校验线程池状态
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 如果线程池状态不是RUNNING,此时再去判断队列是否为空
        if (rs >= SHUTDOWN &&
               //如果这三个条件都满足,就代表是要添加非核心工作线程去处理阻塞队列任务
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        // 内层for循环校验工作线程个数
        for (;;) {
            int wc = workerCountOf(c);
            // 如果工作线程个数大于最大值了,不可以添加了,返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 针对ctl进行 + 1,采用CAS的方式
            if (compareAndIncrementWorkerCount(c))
                // CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了
                break retry;
            c = ctl.get();  // Re-read ctl
            //重新判断获取到的ctl中,表示的线程池状态跟之前的是否有区别
            // 如果状态不一样,说明有变化,重新的去判断线程池状态
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 此处省略添加工作线程及启动工作线程的代码
}

再看添加工作线程及启动工作线程

private boolean addWorker(Runnable firstTask, boolean core) {
    // 此处省略线程池状态及工作线程个数的校验代码
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        //判断Thread是否不为null,在new Worker时,内部会通过给予的ThreadFactory去构建Thread交给Worker
        // 一般如果为null,代表ThreadFactory有问题
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //加锁,保证使用workers成员变量以及对largestPoolSize赋值时,保证线程安全
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 如果线程池状态是RUNNING
                //或者线程池状态为SHUTDOWN,并且firstTask为null,添加非核心工作处理阻塞队列任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker没添加成功之前不能自己启动线程,如果启动了,抛出异常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize在记录最大线程个数的记录
                    // 如果当前工作线程个数,大于最大线程个数的记录,就赋值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 添加worker成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 如果添加worker成功则启动worker的线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //做补偿操作,如果工作线程启动失败,将这个添加失败的工作线程处理掉
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

工作线程启动失败,需要补偿操作

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            // 如果w不为null,从workers将worker移除
            workers.remove(w);
        //同时对ctl进行 - 1,代表去掉了一个工作线程个数
        decrementWorkerCount();
        // 因为工作线程启动失败,判断一下状态的问题,是不是可以走TIDYING状态最终到TERMINATED状态了
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

ThreadPoolExector的Worker

Worker对象主要包含了两个内容

  • 工作线程及线程要执行的任务
  • 工作线程可能会被中断,控制中断

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /** Thread this worker is running in.  Null if factory fails. */
    // 线程工厂构建的线程
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    // 线程要执行的任务
    Runnable firstTask;
    /** Per-thread task counter */
    // 记录当前工作线程处理完的任务数
    volatile long completedTasks;
    Worker(Runnable firstTask) {
        // 将State设置为-1,代表当前不允许中断线程
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    //当thread执行start方法时,调用的是Worker的run方法
    public void run() {
        // 调用ThreadPoolExector的runWorker
        runWorker(this);
    }
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
        Thread t;
        // 只有Worker中的state >= 0的时候,可以中断工作线程
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

ThreadPoolExector的runWorker方法

runWorker就是让工作线程拿到任务去执行,并且在内部也处理了在工作线程正常结束和异常结束时的处理方案,并且设计了任务执行前和执行后的钩子函数,钩子函数默认无实现,如果需要在线程池执行任务前后做一些额外的处理,可以重写钩子函数

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //将Worker中的state置位0,代表当前线程可以中断的
    w.unlock(); // allow interrupts
    // 判断工作线程是否是异常结束,默认就是异常结束
    boolean completedAbruptly = true;
    try {
        // 如果第一个任务为null,去阻塞队列中获取任务
        while (task != null || (task = getTask()) != null) {
            //执行了Worker的lock方法,当前在lock时,shutdown操作不能中断当前线程,因为当前线程正在处理任务
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 这里其实就是做了一个事情,如果线程池状态 >= STOP,确保线程中断了
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 前置钩子
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 后置钩子
                    afterExecute(task, thrown);
                }
            } finally {
                //任务执行完,丢掉任务
                task = null;
                // 当前工作线程处理的任务数+1
                w.completedTasks++;
                //执行unlock方法,此时shutdown方法才可以中断当前线程
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

工作线程退出时,执行这个方法


// 工作线程退出时,执行这个方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount(); // 如果任务异常结束,扣掉一个工作线程
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //前工作线程处理的任务个数累加到线程池处理任务的个数属性中
        completedTaskCount += w.completedTasks;
        // 将工作线程从hashSet中移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 只要工作线程结束了,查看是不是线程池状态改变
    tryTerminate();
    int c = ctl.get();
    // 如果线程池状态为RUNNING或者SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 如果任务正常结束
            // 如果核心线程允许超时,min = 0,否则就是核心线程个数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                // 如果min == 0,并且阻塞队列非空(有任务没有线程处理)
                // 至少要有一个工作线程处理阻塞队列任务
                min = 1;
            if (workerCountOf(c) >= min)
                //如果工作线程个数 大于等于min,不怕没线程处理,正常return
                return; // replacement not needed
        }
        // 异常结束,为了避免出现问题,添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程
        addWorker(null, false);
    }
}

ThreadPoolExector的getTask方法

工作线程在去阻塞队列获取任务前,要先查看线程池状态

如果状态没问题,去阻塞队列take或者是poll任务


private Runnable getTask() {
    //timeOut默认值是false
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null
        // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        // 拿到工作线程个数
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // 核心工作线程允许超时或者工作线程个数大于核心线程数,timed为true,后面会用到
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        // 从阻塞队列获取任务
        try {
            // timed 为true 走poll方法,拿任务,等待一会,为false 走take拿任务,死等
            // 核心线程不允许超时时就是死等队列的任务,线程不会被回收
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 说明当前线程没拿到任务,将timeOut设置为true,在上面就可以返回null退出了
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
} 

ThreadPoolExector的shutdown方法

将线程池状态从RUNNING状态转变为SHUTDOWN

shutdown状态下,不会中断正在干活的线程,而且会处理阻塞队列中的任务

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 里面是一个死循环,将线程池状态修改为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断空闲线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //尝试结束线程
    tryTerminate();
}

ThreadPoolExector的shutdownNow方法

将线程池状态从RUNNING状态转变为STOP

shutdownNow不会处理阻塞队列的任务,将任务全部给你返回了

实际中不要使用

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 里面是一个死循环,将线程池状态修改为STOP
        advanceRunState(STOP);
        // 直接中断工作线程
        interruptWorkers();
        // 将阻塞队列的任务全部扔到List集合中
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

使用原则

  • 核心参数配置

线程池的使用难度不大,难度在于线程池的参数并不好配置,主要难点在于任务类型无法控制,比如任务有CPU密集型,还有IO密集型,以及混合型,因为IO咱们无法直接控制,所以很多时间按照一些书上提供的一些方法,是无法解决问题的。想调试出一个符合当前任务情况的核心参数(核心线程数,最大线程数,队列容量),最好的方式就是测试。

  • 异常处理机制

线程池默认会吞掉任务抛出的异常,需在Runnable/Callable内部通过try-catch捕获异常或重写ThreadPoolExecutor.afterExecute钩子方法处理。

  • 线程泄漏预防

1,避免重复创建线程池(如每次请求新建线程池),应复用全局线程池实例,笔者曾在这里犯过错误

2,使用ThreadLocal时需在任务完成后手动清除,防止线程复用导致内存泄漏

  • 优雅关闭

调用shutdown()平滑关闭(等待已提交任务完成)

  • 线程池隔离

不同类型任务(如高/低优先级、长/短耗时)应使用独立线程池,避免相互影响

  • 线程命名与调试

自定义ThreadFactory为线程设置有意义名称,便于日志排查问题

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言