HICSC
一文读懂Java线程池

为什么要用线程池

在生产环境,我们经常面临的情况是:处理某次请求的时间非常短暂,但是请求量很大。

在这种情况下,如果为每个请求单独创建一个线程,有限的硬件资源有可能会被OS创建线程,切换线程状态、销毁线程这些操作所占用,用于业务处理的资源反而减少了。

所以理想的处理方式是:将请求的线程数量控制在一个范围内,既保证后续的请求不会等待太长时间,又保证物理机将足够的资源用于请求处理本身。

线程池设计与结构

开发者通常利用 ThreadPoolExecutor 类的构造函数来创建不同配置的线程池:

ThreadPoolExecutor(int corePoolSize,        // 核心线程数量
                   int maximußmPoolSize,    // 最大线程数量
                   long keepAliveTime,    // 空闲线程的回收时间阈值
                   TimeUnit unit,                // 时间单位,配合上一个参数使用
                   BlockingQueue<Runnable> workQueue, // 等待队列
                   ThreadFactory threadFactory,                // 线程工厂,用于创建线程
                   RejectedExecutionHandler handler)    // 线程池拒绝处理器

透过这个构造方法,可以大致勾勒出线程池的基本组成,其大致的结构如下图所示。

线程池的基本组成

这里一定要注意:存在于线程池中的一定是Thread对象,而不是你要处理的任务。因此才叫线程池而不是任务池,线程池会分配池中的一个空闲线程对象来运行提交的任务。

构成线程池的几个要素:

注意:实际上,并没有「核心线程」与「非核心线程」这样的概念,只是为了方便大家理解,而所有区分而已。因此,全文对这两个概念均加了引号。

线程池的大致工作流程:

1、开发者提交待执行任务,线程池收到这个任务请求后,有以下几种处理情况:

2、一旦线程池中某个线程完成了任务的执行,它就会试图到任务等待队列中拿去下一个等待任务 ( 所有的等待任务都实现了 BlockingQueue 接口,这是一个可阻塞的队列接口 ) ,它会调用等待队列的 poll 方法,并停留在哪里。

3、当线程池中的线程超过您设置的 corePoolSize 参数,说明当前线程池中有所谓的“非核心线程”。那么当某个线程处理完任务后,如果等待 keepAliveTime 时间后仍然没有新的任务分配给它,那么这个线程将会被回收。线程池回收线程时,并不是所谓的“非核心线程”才会被回收,而是谁的空闲时间达到 keepAliveTime 这个阀值,就会被回收,直到线程池中线程的数量等于您设置的corePoolSize参数时,回收过程才会停止。

对所谓的“核心线程”和“非核心线程”是一视同仁的,直到线程池中线程的数量等于您设置的corePoolSize参数时,回收过程才会停止。

Executor 框架基本组成

使用 ThreadPoolExecutor 类的构造方法可以创建不同配置的线程池,但平时却很少使用。大多数应用场景下,使用Java并发包中的 Executors 提供的5个静态工厂方法就足够了。

首先,来看看 Executor 框架的基本组成,图片来自于:极客时间 - 杨晓峰 - Java核心知识36讲。

Executor基本组成

Executor 是一个基础接口,只有一个 execute(Runnable) 方法,用于任务执行,它屏蔽了许多任务提交、线程创建和调度等不相关细节。

ExecutorService 接口则更加完善,提供了一些管理功能,比如 shutdown 方法用于关闭线程池;还提供了更加全面的任务提交机制,比如用 submit 方法提交任务,返回的是一个 Future 对象,用于获取任务执行结果;甚至还有批量处理任务的功能,比如 invokeAll 或者 invokeAny 等方法。

ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool 则是 Java 提供的几种基础线程池实现,以满足复杂多变的应用场景。

Executors 是一个工具类,从简化使用的角度,提供了各种静态工厂方法来创建不同配置的线程池。

使用 ThreadPoolExecutor 创建线程池

前面的内容已经给出 ThreadPoolExecutor 的构造方法,这里对构造方法的后3个参数作详细的说明,帮你更好的理解和使用线程池。

线程池的等待队列

只要实现了 BlockingQueue 接口的队列,都可以作为线程池的等待队列,常见的比如:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue、LinkedTransferQueue等,至于每种队列的区别以及原理,不再本文讨论范文内。

但网上有些内容真的很容易误导人,这里提两句。

SynchronousQueue 也可以存储数据,且是无锁实现,只是size()方法直接返回0而已。因此,网上的很多内容在把 SynchronousQueue 和 LinkedBlockingQueue 与 LinkedTransferQueue 作对比时,部分内容的正确性是有待确认的,这点需要读者自己注意。

PriorityBlockingQueue 会按照优先级进行对内部元素进行排序,优先级最高的元素将始终排在队列的头部。但它不会保证优先级一样的元素的排序,也不保证当前队列中除了优先级最高的元素以外的元素,处于正确排序的位置。因此,它并不是真正意义的排序。

线程池的 ThreadFactory

线程池最主要的一项工作,就是在满足某些条件的情况下创建线程。而在 ThreadPoolExecutor 线程池中,创建线程的工作交给 ThreadFactory 来完成。要使用线程池,就必须要指定 ThreadFactory。如果没有指定,会使用默认的 ThreadFactory:DefaultThreadFactory (这个类在Executors工具类中)。

当然,在某些特殊业务场景下,您还可以使用一个自定义的 ThreadFactory 线程工厂,如下代码片段:

public class CustomThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            // do something before new thread created;
            // create new thread and return
            return new Thread(r);
      }
}

线程池的拒绝策略

在 ThreadPoolExecutor 线程池中还有一个重要的接口:RejectedExecutionHandler。当提交任务给线程池时,出现以下情况时,线程池会拒绝处理这个任务,并触发线程池创建时定义的拒绝策略:

实际上,在 ThreadPoolExecutor 中已经提供了四种可以直接使用的 RejectedExecutionHandler 接口的实现:

其中,CallerRunsPolicy 直接调用任务的 run 方法,可能会造成线程安全问题;DiscardPolicy 默默的忽略掉被拒绝任务,也没有输出日志或者任何提示,开发者就无法得知线程池在处理过程出现的错误;DiscardOldestPolicy 貌似是科学的,但如果等待队列出现了容量问题,很多任务会被直接丢弃,这时,业务会出现BUG,但开发者却很难定位到。

因此,比较科学的还是 AbortPolicy 提供的处理方式:抛出异常,由开发人员进行处理。当然,特殊情况下,我也建议使用自定义拒绝策略,可以缓存任务到Redis,或者发送消息到MQ通知业务方。

扩展 ThreadPoolExecutor 线程池

在 ThreadPoolExecutor 中提供了3个方法供子类重写,它们可以帮助开发者在线程池处理任务的不同阶段,进行额外的业务处理操作:

Execute 和 Submit 方法的区别

ThreadPoolExecutor 提供 execute 和 submit 两个方法用于提交任务,其中:

execute:提交的任务实现 Runnable 接口,任务没有任何返回值,因此,无法获取任何执行结果。

submit:提交的任务实现 Callable 接口,在任务运行完成后,会返回执行结果。

当然 submit 方法也可以提交实现 Runnable 接口的任务,但其处理方式与 execute 方法的处理方式完全不同:使用 submit 方法提交的实现了 Runnable 接口的任务,将会被封装到线程池内部使用 Executors.callable 方法创建的 RunnableAdapter 对象中,而 RunnableAdapter 又继承自 Callable。

线程池实践

理解 Executors 创建的线程池

如果使用 Executors 创建线程池,那么一定要理解每个方法创建出来的线程池的配置是什么。

比如,newCachedThreadPool 方法创建的线程池,其 corePoolSize = 0,而 maximumPoolSize = Integer.MAX,而其等待队列是 SynchronousQueue,不能缓冲数据。它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过 60 秒,则被终止并移出缓存;长时间闲置时,这种线程池,不会消耗什么资源。但需要着重注意的是,过快的任务提交速度,不但会导致线程数的急剧增加,也增加了程序OOM的风险。

而 newFixedThreadPool ( int nThreads) 方法中,corePoolSize=maximumPoolSize=nThreads,任何时候最多有nThreads 个工作线程是活动的,这也意味着如果任务数量超过了活动队列数目,将在工作队列中等待空闲线程出现;如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目 nThreads。

实际应用场景中,可能有很多人滥用了这些方法,所以,阿里Java规约建议使用 ThreadPoolExecutor 构造方法来代替 Executors。

不要随意创建线程池

站在应用或者服务的角度,对整个服务中线程的用途归类,每个分类创建合适的线程池即可。

看到过挺多代码,只要用到线程,就是用 Executors 在类中创建了一个线程池,这样挺不好的。

记住一点:服务中创建的线程池越多,就越不好监控,也越不好配置合适的线程池数量。

线程池大小如何配置

也许你已经看到过无数计算最佳线程数的公式,首先,随便选一个,真的无所谓。

然后,随时监控线程池状态。最简单的方式,每次提交任务时,在日志中记录下线程池的几个关键参数:corePoolSize、maximumPoolSize、线程池当前线程数量、工作队列长度。

最后,相信我,有上面4个参数,再结合硬件资源,你能够很清楚的知道,自己的线程池配置是否合适。

避免任务堆积

开发者要随时注意,任务的耗时,以免线程池被耗尽,等待队列被填满。

如果任务依赖于第三方服务,一定要设置超时时间。

避免任务丢失

在一些特殊的情况下,线程池的负载短时间内快速升高,有可能会触发拒绝策略,如果提交的任务不允许丢失,那么需要自定义拒绝策略,将任务暂存到数据库或者缓存中。

也就是在使用线程池时,要尽量兼顾到线程池的所有使用场景。

正确的关闭线程池

调用线程池的 shutdown 方法后,线程池不再接受新任务,如果继续提交,线程池会使用拒绝策略响应。

调用线程池的 shutdownNow 方法,会中断所有线程,然后取出工作队列中所有未完成的任务返回给调用者。

调用这两个方法都不会主动等待任务执行结束,在某些场景下,在关闭线程池时,需要保证所有的任务均执行完毕,可以调用 awaitTermination 方法判断。

如果线程池任务执行结束,awaitTermination 方法将会返回 true,否则在等待指定时间后将会返回 false。

// 阻止接收新任务
threadPool.shutdown();
// 等待线程池中的任务执行完成
while (!threadPool.awaitTermination(60,TimeUnit.SECONDS)){
        // do nothing
}
// 示例代码,不建议生产环境直接使用
// 如果有任务卡住,会导致线程池不能关闭

Comments

Post a Message

人生在世,错别字在所难免,无需纠正。

提交评论