醋醋百科网

Good Luck To You!

OOM分析之ThreadPoolExecutor_threadpoolexecutor源码分析

背景

通过单例模式可以有效的减少内存使用。但是随着压测并发数的不断提高,QRCodeTask对象不断增加,内存占用相应也会一直增加。再加上QRCodeTask任务的业务功能是合成图片,属于CPU密集型任务。如果处理的QRCodeTask任务太多,会一直占用CPU,造成其它接口响应的速度变慢。

因此可以对ThreadPoolExecutor深入研究来找到进一步优化的措施。

Java SE API文档链接如下:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

二.ThreadPoolExecutor介绍

1.构造函数解析

通过查看源码可以看到所有不同形参的构造函数最终都会调用到以下的构造函数。

public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                  BlockingQueue<Runnable> workQueue,
                  ThreadFactory threadFactory,
                  RejectedExecutionHandler handler)

2.参数介绍

corePoolSize:线程池中核心线程数目,即使空闲也不回收,除非设置了allowCoreThreadTimeOut时间。当有新增任务且当前线程数小于corePoolSize时,会继续创建核心线程执行任务。当线程数达到corePoolSize时,后面新增任务都会加入到BlockingQueue队列中等待执行。

maximumPoolSize:线程池中永许达到的最大线程数母。如果BlockingQueue队列满,且当前线程数小于maximumPoolSize,则线程池会创建新的临时线程继续执行后续任务,直到线程数目达到maximumPoolSize。如果BlockingQueue使用了无界队列,此参数设置了也没有实际意义。

keepAliveTime:临时线程空闲后的存活时间,超时后空闲线程会被销毁。

unit:keepAliveTime的时间单元。单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS)和毫微秒(NANOSECONDS)。

workQueue:暂存任务的工作队列,执行execute方法提交的Runnable任务都会加入到此队列中。与ThreadPoolExecutor相关的BlockingQueue接口实现类有以下4种:

  1. ArrayBlockingQueue:数组实现的有界队列。
  2. LinkedBlockingQueue:链表实现的无界队列,未指明容量时,默认大小为Integer.MAX_VALUE,即21亿多。
  3. SynchronousQueue:不能存储元素的同步队列,主要用于生产者消费者之间的同步。
  4. DelayedWorkQueue:延迟队列,一个无界阻塞队列,只有延迟时间结束才能出队。

ThreadFactory:线程工厂,可以给新建线程设置优先级、名字、是否守护线程、线程组等信息。

RejectedExecutionHandler:任务队列满且线程数也到达极限时的回调函数,用来对无法处理的任务实施拒绝策略,默认策略是AbortPolicy。Executor提供的4种拒绝策略如下:

  1. ThreadPoolExecutor.AbortPolicy:默认策略,抛出java.util.concurrent.RejectedExecutionException异常 。
  2. ThreadPoolExecutor.CallerRunsPolicy: 调用execute()方法 ,重试添加当前的任务。
  3. ThreadPoolExecutor.DiscardPolicy:直接抛弃无法处理的任务。
  4. ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列中最早加进去的任务,即队列头的任务,然后执行execute()方法,重新添加新提交的任务。

除以上4种策略外,ThreadPoolExecutor还可以自定义饱和策略。用户可以灵活的根据实际应用场景实现RejectedExecutionHandler接口,添加符合自身要求的业务代码,如记录日志或持久化不能处理的任务等。

3.ThreadPoolExecutor工作原理

  1. 当有新增任务且当前线程数小于corePoolSize时,创建核心线程执行任务。
  2. 当线程数达到corePoolSize时,后续新增任务都会加到BlockingQueue队列中排队等待执行。
  3. 当BlockingQueue也满后,会创建临时线程执行任务。如果临时线程超过空闲时间后会被销毁。
  4. 当线程总数达到maximumPoolSize时,后续新增任务都会被RejectedExecutionHandler拒绝。

三.Executors介绍

Java SE API文档链接如下:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html

为了方便程序员的使用,Java设计者贴心的在Executors类中实现了4种获取线程池的静态方法,目的是让程序员不关心各个参数的细节就能得到合适自己的线程池。下面是对各个函数进行介绍:

1. newFixedThreadPool:固定大小线程池,无界队列。

构造方法:

public static ExecutorService newFixedThreadPool(intnThreads){
    return newThreadPoolExecutor(nThreads, nThreads,
                       0L,TimeUnit.MILLISECONDS,
                       newLinkedBlockingQueue<Runnable>());
 } 
public ThreadPoolExecutor(int corePoolSize,intmaximumPoolSize,
                       longkeepAliveTime,TimeUnit unit,
                       BlockingQueue<Runnable> workQueue){
 this(corePoolSize,maximumPoolSize, keepAliveTime, unit, 
 workQueue,Executors.defaultThreadFactory(),defaultHandler);
 }                            

可以看到corePoolsize=maximumPoolSize,超时时间为0,并用了无界任务队列。当任务小于corePoolsize时,会直接创建新的线程执行新增任务。当任务数等于corePoolsize时,新增任务加到无界队列中。此后所有线程即使空闲也不会回收,会一直保持活动状态直到执行shutdown方法。

如果随着任务的增加,任务队列也满,则执行默认的饱和策略,即抛出异常,进程停止。

2.newSingleThreadExecutor:单线程线程池,无界队列。如果线程意外停止,会新建一个线程代替它去执行后续任务。可以保证任务都是按序执行。

public staticExecutorService newSingleThreadExecutor() {
    return newFinalizableDelegatedExecutorService(
       new ThreadPoolExecutor(1, 1,0L,TimeUnit.MILLISECONDS,
       newLinkedBlockingQueue<Runnable>()));
}

corePoolsize=maximumPoolSize=1,超时时间为0,无界任务队列。线程池中只有一个核心线程,当有新增任务时加到无界队列中。

3.newCachedThreadPool:可缓存线程池,无界线程池。

 public static ExecutorServicenewCachedThreadPool(
 ThreadFactory threadFactory) {
     return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
                                    60L,TimeUnit.SECONDS,
             newSynchronousQueue<Runnable>(),threadFactory);
  }

corePoolsize=0,maximumPoolSize=Integer.MAX_VALUE,超时时间为60s,SynchronousQueue任务队列。若处理任务大于当前线程数,且无空闲线程则新建线程执行任务。若有空闲线程,则重复利用空闲线程。当线程空闲时间超时,会对线程进行销毁。

此线程池与其它线程池的最大不同点是SynchronousQueue队列不能暂存任务,只能通过线程数的无限增加来处理并发任务。

4.newScheduledThreadPool:定时任务线程池,使用无界队列。可以定时以及周期性执行任务。

public staticScheduledExecutorService newScheduledThreadPool(
          int corePoolSize, ThreadFactorythreadFactory) {
        return newScheduledThreadPoolExecutor(corePoolSize, 
                                             threadFactory);
}
publicScheduledThreadPoolExecutor(int corePoolSize,
                             ThreadFactory threadFactory) {
       super(corePoolSize, Integer.MAX_VALUE,0, NANOSECONDS,
                       new DelayedWorkQueue(),threadFactory);
}

maximumPoolSize=Integer.MAX_VALUE,超时时间为0s,DelayedWorkQueue任务队列。DelayedWorkQueue队列按照元素时间排序,可以保证延时少的任务优先执行。

四.项目优化

通过上面的介绍,相信大家对ThreadPoolExecutor的工作机制有了深入的了解,再回到项目中遇到的问题。

查看代码发现项目中的线程池是使用了
Executors.newFixedThreadPool,因此当请求持续增加时,QRCodeTask任务就会一直加到无界队列中等待执行,即使通过单例模式使内存占用得到了优化,但是在并发量大的情况下,内存也可能随着队列元素的无限增加最终导致被撑爆。

根据墨菲定律,如果事情有变坏的可能,不管这种可能性有多小,它总会发生。因此为了避免以后线上应用发生故障,必须对这部分代码做进一步的优化。

此处我们不使用 Executors 去得到线程池,而是直接调用ThreadPoolExecutor构造函数,这样可以通过灵活设置参数来构造满足业务需求的线程池,避免资源耗尽。在此项目中修改如下:

ThreadPoolExecutor executor = newThreadPoolExecutor( 
                 10, 15, 60, TimeUnit.MINUTES, 
                 new LinkedBlockingQueue<Runnable>(10000), 
                 new CustomThreadFactory(), 
                 newCustomRejectedExecutionHandler()); 

创建核心线程为10,最大线程20,超时时间60s,任务队列为10000的线程池,并且使用了自定义的ThreadFactory和RejectedExecutionHandler。为方便以后问题的追溯,在自定义ThreadFactory中定义了自己的线程名,并在RejectedExecutionHandle中实现了满足业务需求的处理方法。当请求任务队列达到10000,线程数达到20时,执行给定的拒绝策略。

最后对改造后的程序进行充分测试,应用性能表现平稳,也符合了业务要求,至此这个问题得到了圆满解决。

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言