java线程池executor线程池Service,里面有多少空余线程,怎么看

About ExecutorService(2),自定义线程池 - 简书
下载简书移动应用
写了46220字,被1115人关注,获得了852个喜欢
About ExecutorService(2),自定义线程池
琢磨了一下,还是把这篇提前了,本片篇幅可能会有些长,甚至冗余,请各位看官原谅我这拙劣的写作能力。
本篇的重点是,简单线程池的实现,Executor框架,优化后的自定义线程池。
Android主线程不是线程安全的,不能阻塞太久,否则会报ANR异常,于是,我们经常这样做来处理耗时操作,查询数据,甚至联网请求。
new Thread(new Runnable() {
public void run() {
/*处理耗时操作*/
}).start();
这段看似常用,而且大家都在用,其实蕴藏着很多学问。那么我就为大家解释一下。
这段代码首先创建了一个线程,并在run( )方法结束后,系统自动回收该线程,可以说在简单的应用中,没什么问题,但是如果放到复杂的生产环境中,系统由于真实环境的需要,可能会开启很多线程来做支撑。而当线程数量过大时,反而会消耗CUP和内存资源。
与进程相比,线程算的上是一个轻量级的工具,但是!!!其创建和关闭都需要花费时间和资源,频繁的创建子线程,很可能出现造成创建和销毁线程所花的时间要比业务逻辑的工作时间还要长的悲剧,这样就得不偿失了。
Android的垃圾回收机制是基于虚拟机的,虽智能,但不够完美,子线程频繁创建和销长毁,依然会抢夺资源,不仅给GC带来压力,严重的时候甚至会报OOM异常。
所以,应该对子线程的使用掌握一个度,接下来,duang,duang,duang,给大家带来线程池的简单实现。
线程池的基本功能就是进行线程的复用,由于篇幅原因,我把它提前发布出来并附上链接。
通过上面链接的小例子,是不是对线程池的构造有了一个初步的了解。接下来为大家介绍一下系统自带的Executor框架,为开发者们自定义线程池带来了极大的方便。
可以说Executors是一个工厂类,里面有许多静态方法,供开发者调用。
/*该方法返回一个固定线程数量的线程池,该线程池池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。
* 若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务
* 默认等待队列长度为Integer.MAX_VALUE*/
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
/*该方法返回一个只有一个线程的线程池。
* 若多余一个任务被提交到线程池,任务会被保存在一个任务队列中,等待线程空闲,按先入先出顺序执行队列中的任务
* 默认等待队列长度为Integer.MAX_VALUE*/
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
/*该方法返回一个可根据实际情况调整线程数量的线程池。
* 线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。
* 若所有线程均在工作,又有新任务的提交,则会创建新的线程处理任务。
* 所有线程在当前任务执行完毕后,将返回线程池进行复用*/
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
/*该方法返回一个ScheduledExecutorService对象,线程池大小为1。
* ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间内执行某任务的功能,
* 如在某个固定的延时之后执行,或者周期性执行某个任务*/
ExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
/*该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量*/
ExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
如果童鞋们认为,Executors工厂类提供的自定义线程池,还不够用的话,也可以自定义线程池。通过ThreadPoolExecutor的构造函数自定义需要的线程池。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue&Runnable& workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
corePoolSize:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空余线程存活时间,即,超过corePoolSize的空闲线程,在多长时间内被销毁。如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
unit:keepAliveTime的单位。
workQueue:任务队列,被提交但尚未被执行任务的任务。有以下几种队列模式:
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
threadFactory:线程工厂,用于创建线程,一般用默认的即可。也可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常有帮助。
handler:拒绝策略,当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。系统内置的策略模式如下:
AbortPolicy:直接抛出异常,阻止系统正常工作。
CallerRunsPolicy:只要线程未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
DiscardOldestPolicy:该策略将丢弃最老的一个请求,也就是即将被执行的一个人任务,并尝试再次提交当前任务。
DiscardPolicy:该策略默默的丢弃无法处理对
当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。
好像好久没说到AT了,那么接下来我们根据源码和画图的方式分析一下AT中到底构造了一个如何的线程池
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
private static final BlockingQueue&Runnable& sPoolWorkQueue =
new LinkedBlockingQueue&Runnable&(128);
* An {@link Executor} that can be used to execute tasks in parallel.
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
为了保持处理器达到期望的使用率,最优线程池大小,终于在一本叫做《Java并发编程实践》的书中找到了估算线程池大小的经验值:Nthread。
Nthread = Ncpu * Ucpu * (1 + W/C)
其中,Ncpu = CPU的数量,Ucpu = 目标CPU的使用量,0 &= Ucpu &=1,W/C = 等待时间与计算时间的比率。
由于各种八核手机出现,以及硬件的提升,这个公式貌似也不常用了。大家了解即可。
接下来介绍AT中线程池的工作原理
1,假如我手中用了双核手机(两个CPU),初始状态下线程池的工作线程为0,当第一个任务进来时,会创建工作线程。则根据AT中计算CPU_COUNT = 2,CORE_POOL_SIZE = 3 ,红色代表新任务,当线程池中的工作线程池数量已经达到了CORE_POOL_SIZE,下个任务,如果工作线程没有空闲的话,必将进入缓冲队列中,进行排队。
2,线程池中没有空闲线程,新的任务要进入缓冲队列进行排队。
3,缓冲队列也满了,线程池中的工作线程依然没有空闲。
通过AT中的计算,MAXIMUM_POOL_SIZE = 5。
当线程池工作线程达到CORE_POOL_SIZE并且没有空闲,缓冲队列任务数量达到AT中设定的128,
此时新的任务进入线程池之前,如果线程池中工作线程数量小于MAXIMUM_POOL_SIZE,则创建新的工作线程执行任务。如下图,线程池创建第4个工作线程,执行第129个任务。
4.线程池工作线程数量等于MAXIMUM_POOL_SIZE,缓冲队列数量等于最大workQuene最大值,并且没有工作线程处于空闲,第131任务进入线程池的时候,拒绝策略handler发挥作用,AT中默认的拒绝策略为抛出RuntimeException异常。(RuntimeException和Error同属于UncaughtException,可以在Application中进行捕获和处理)
由上述步骤,不难发现workQueue应该避免无边界队列,尽量使用带边界的,如ArrayBlockingQueue和固定capacity容量的LinkedBlockingQueue。否则线程池的缓冲队列将是一个无边界的队列,任务过多而不能执行拒绝策略的情况下,可能会撑满内存,导致整个系统不可用,造成不可预估的后果。
最近有群里的朋友抱怨,Android studio 编译速度太慢,太卡了,交给大家一个小技巧,我的电脑还是公司的老款戴尔,至于你信不信,反正我是信了,它确实快了。
若你喜欢怪人,其实我很美。
打开微信“扫一扫”,打开网页后点击屏幕右上角分享按钮
被以下专题收入,发现更多相似内容:
android开发技巧,原理解析等等
· 12880人关注
专注收藏Android开发技术文章
· 55人关注
若你喜欢怪人,其实我很美。
选择支付方式:<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&JAVA线程池学习以及队列拒绝策略
JAVA线程池学习以及队列拒绝策略,有需要的朋友可以参考下。工作中遇到了消息队列的发送,之前都是用数据库作为中转和暂存的。这次考虑用多线程的方式进行消息的发送,于是学习了一下线程池的应用。说实话,实践中对Java高级特性的应用真的不多,对多线程的理解也就一直停留在理论层面。借着实践的机会好好整理一下。准备从以下几个方面总结:线程池的使用消息队列——生产者消费者模式定时任务Quartz原理线程池的大小、队列大小设置这个部分是有关线程池的使用:1. 为什么要用线程池?在Java中,如果每当一个请求到达就创建一个新线程,开销是相当大的。在实际使用中,每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在实际处理实际的用户请求的时间和资源要多的多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个JVM中创建太多的线程,可能会导致系统由于过度消耗内存或者“切换过度”而导致系统资源不足。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。线程池主要用来解决线程生命周期开销问题和资源不足问题,通过对多个任务重用线程,线程创建的开销被分摊到多个任务上了,而且由于在请求到达时线程已经存在,所以消除了创建所带来的延迟。这样,就可以立即请求服务,使应用程序响应更快。另外,通过适当的调整线程池中的线程数据可以防止出现资源不足的情况。2. ThreadPoolExecutor类JDK 1.5以后,Java提供一个线程池ThreadPoolExecutor类。下面从构造函数来分析一下这个线程池的使用方法。public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue&Runnable& workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);参数名、说明:参数名说明corePoolSize线程池维护线程的最少数量maximumPoolSize线程池维护线程的最大数量keepAliveTime线程池维护线程所允许的空闲时间workQueue任务队列,用来存放我们所定义的任务处理线程threadFactory线程创建工厂handler线程池对拒绝任务的处理策略ThreadPoolExecutor将根据corePoolSize和maximumPoolSize设置的边界自动调整池大小。当新任务在方法execute(Runnable) 中提交时, 如果运行的线程少于corePoolSize,则创建新线程来处理请求。如果正在运行的线程等于corePoolSize时,ThreadPoolExecutor优先往队列中添加任务,直到队列满了,并且没有空闲线程时才创建新的线程。如果设置的corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。keepAliveTime:当线程数达到maximumPoolSize时,经过某段时间,发现多出的线程出于空闲状态,就进行线程的回收。keepAliveTime就是线程池内最大的空闲时间。workQueue:当核心线程不能都在处理任务时,新进任务被放在Queue里。线程池中任务有三种排队策略:a. 直接提交。直接提交策略表示线程池不对任务进行缓存。新进任务直接提交给线程池,当线程池中没有空闲线程时,创建一个新的线程处理此任务。这种策略需要线程池具有无限增长的可能性。实现为:SynchronousQueueb. 有界队列。当线程池中线程达到corePoolSize时,新进任务被放在队列里排队等待处理。有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。c. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的&#20540;也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。拒绝策略:当任务源源不断的过来,而我们的系统又处理不过来的时候,我们要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。1)CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) { r.run(); }}这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)2)AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionExceptionpublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException();}这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)3)DiscardPolicy:不能执行的任务将被删除public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。4)DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }}该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。3. Executors 工厂ThreadPoolExecutor是Executors类的实现,Executors类里面提供了一些静态工厂,生成一些常用的线程池,主要有以下几个:newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大&#20540;就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。(我用的就是这个,同上所述,相当于创建了相同corePoolSize、maximumPoolSize的线程池)newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。4. 举个栗子测试类:ThreadPool,创建了一个 newFixedThreadPool,最大线程数为3的固定大小线程池。然后模拟10个任务丢进去。主线程结束后会打印一句:主线程结束。public class ThreadPool { public static void main(String[] args) {//创建固定大小线程池ExecutorService executor = Executors.newFixedThreadPool(3);for (int i = 0; i & 10; i&#43;&#43;) {SendNoticeTask task = new SendNoticeTask();task.setCount(i);executor.execute(task);}System.out.println(&主线程结束&); }}测试类:SendNoticeTask,执行任务类,就是打印一句当前线程名&#43;第几个任务。为了方便观察,每个线程执行完以后睡10s。public class SendNoticeTask implements Runnable { public void setCount(int count) {this.count = } @Override public void run() {System.out.println(Thread.currentThread().getName() &#43; & start to& &#43; & send & &#43; count &#43; & ...&);try {Thread.currentThread().sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(&finish & &#43; Thread.currentThread().getName()); }}执行结果:主线程结束pool-1-thread-3 start to send 2 ...pool-1-thread-1 start to send 0 ...pool-1-thread-2 start to send 1 ...finish pool-1-thread-3finish pool-1-thread-2pool-1-thread-2 start to send 3 ...finish pool-1-thread-1pool-1-thread-3 start to send 4 ...pool-1-thread-1 start to send 5 ...finish pool-1-thread-3finish pool-1-thread-1pool-1-thread-1 start to send 6 ...pool-1-thread-3 start to send 7 ...finish pool-1-thread-2pool-1-thread-2 start to send 8 ...finish pool-1-thread-1pool-1-thread-1 start to send 9 ...finish pool-1-thread-3finish pool-1-thread-2finish pool-1-thread-1由上可见执行顺序是这样的:线程池创建了三个线程,分别执行任务0、1、2,由于线程创建需要一定时间,因此前三个线程的执行顺序具有一定随机性。此时主线程接着往线程池中塞任务,线程池已达到最大线程数(3),于是开始往队列里放。当某个线程执行完任务后,直接从队列里拉出新的任务执行,队列具有先进先出的特性,因此后面的任务执行是有序的。这个看一下 Executors 类的源码就更明白了。public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue&Runnable&());}实际上是创建了一个具有固定线程数、无界队列的 ThreadPoolExecutor。队列无界,不会拒绝任务提交,因此使用此方法时,需要注意资源被耗尽的情况。测试类:TestThreadPool,采用有界队列(队列大小2),和默认拒绝策略的ThreadPoolExecutor。public class TestThreadPool { private static final int corePoolSize = 2; private static final int maximumPoolSize = 4; private static final int keepAliveTime = 1000; private static BlockingQueue&Runnable& workQueue = new ArrayBlockingQueue&Runnable&(2); public static void main(String[] args) {//创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQueue);for (int i = 0; i & 10; i&#43;&#43;) {SendNoticeTask task = new SendNoticeTask();task.setCount(i);executor.execute(task);}System.out.println(&主线程结束:& &#43; Thread.currentThread().getName()); }}执行结果:pool-1-thread-1 start to send 0 ...Exception in thread &main& java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658) at com.dylanviviv.pool.TestThreadPool.main(TestThreadPool.java:24)pool-1-thread-4 start to send 5 ...pool-1-thread-2 start to send 1 ...pool-1-thread-3 start to send 4 ...finish pool-1-thread-1finish pool-1-thread-3pool-1-thread-1 start to send 2 ...finish pool-1-thread-2pool-1-thread-3 start to send 3 ...finish pool-1-thread-4finish pool-1-thread-3finish pool-1-thread-1线程池创建了2个线程,分别执行任务0、1,线程池达到corePoolSize,新进任务2、3被放入队列中等待处理,此时队列满,而线程池中线程没有执行完任务0、1,线程池创建新的线程,执行新进任务4、5,达到maximumPoolSize。此时所有任务都没有执行结束,主线程又继续提交任务,线程池进入默认异常策略(AbortPolicy)拒绝服务。
最新教程周点击榜
微信扫一扫Java并发编程:线程池的使用
  在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
  如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
  那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
  在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。
  以下是本文的目录大纲:
  一.Java中的ThreadPoolExecutor类
  二.深入剖析线程池实现原理
  三.使用示例
  四.如何合理配置线程池的大小 
  若有不正之处请多多谅解,并欢迎批评指正。
  请尊重作者劳动成果,转载请标明原文链接:
  /dolphin0520/p/3932921.html
一.Java中的ThreadPoolExecutor类
  java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
  在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue&Runnable& workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue&Runnable& workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue&Runnable& workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue&Runnable& workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
&  从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
&  下面解释下一下构造器中各个参数的含义:
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS;
TimeUnit.HOURS;
TimeUnit.MINUTES;
TimeUnit.SECONDS;
TimeUnit.MILLISECONDS;
TimeUnit.MICROSECONDS;
TimeUnit.NANOSECONDS;
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQ
LinkedBlockingQ
SynchronousQ
  ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
&  具体参数的配置与线程池的关系将在下一节讲述。
  从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:
public abstract class AbstractExecutorService implements ExecutorService {
protected &T& RunnableFuture&T& newTaskFor(Runnable runnable, T value) { };
protected &T& RunnableFuture&T& newTaskFor(Callable&T& callable) { };
public Future&?& submit(Runnable task) {};
public &T& Future&T& submit(Runnable task, T result) { };
public &T& Future&T& submit(Callable&T& task) { };
private &T& T doInvokeAny(Collection&? extends Callable&T&& tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
public &T& T invokeAny(Collection&? extends Callable&T&& tasks)
throws InterruptedException, ExecutionException {
public &T& T invokeAny(Collection&? extends Callable&T&& tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
public &T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks)
throws InterruptedException {
public &T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
&  AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
  我们接着看ExecutorService接口的实现:
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedE
&T& Future&T& submit(Callable&T& task);
&T& Future&T& submit(Runnable task, T result);
Future&?& submit(Runnable task);
&T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks)
throws InterruptedE
&T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks,
long timeout, TimeUnit unit)
throws InterruptedE
&T& T invokeAny(Collection&? extends Callable&T&& tasks)
throws InterruptedException, ExecutionE
&T& T invokeAny(Collection&? extends Callable&T&& tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutE
&  而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:
public interface Executor {
void execute(Runnable command);
&  到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
  Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
  然后ThreadPoolExecutor继承了类AbstractExecutorService。
  在ThreadPoolExecutor类中有几个非常重要的方法:
shutdown()
shutdownNow()
&  execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
  shutdown()和shutdownNow()是用来关闭线程池的。
  还有很多其他的方法:
  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。
二.深入剖析线程池实现原理
  在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:
  1.线程池状态
  2.任务的执行
  3.线程池中的线程初始化
  4.任务缓存队列及排队策略
  5.任务拒绝策略
  6.线程池的关闭
  7.线程池容量的动态调整
1.线程池状态
  在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runS
static final int RUNNING
static final int SHUTDOWN
static final int STOP
static final int TERMINATED = 3;
&  runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
  下面的几个static final变量表示runState可能的几个取值。
  当创建线程池后,初始时,线程池处于RUNNING状态;
  如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
  如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
  当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
2.任务的执行
  在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:
private final BlockingQueue&Runnable& workQ
//任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();
//线程池的主要状态锁,对线程池状态(比如线程池大小
//、runState等)的改变都要使用这个锁
private final HashSet&Worker& workers = new HashSet&Worker&();
//用来存放工作集
private volatile long
keepAliveT
//线程存货时间
private volatile boolean allowCoreThreadTimeO
//是否允许为核心线程设置存活时间
private volatile int
//核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int
maximumPoolS
//线程池最大能容忍的线程数
private volatile int
//线程池中当前的线程数
private volatile RejectedExecutionH //任务拒绝策略
private volatile ThreadFactory threadF
//线程工厂,用来创建线程
private int largestPoolS
//用来记录线程池中曾经出现过的最大线程数
private long completedTaskC
//用来记录已经执行完毕的任务个数
&  每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。
  corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:
  假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
  因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
  当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
  如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
  然后就将任务也分配给这4个临时工人做;
  如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
  当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
  这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
  也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
  不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。
  largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。
  下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。
  在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize &= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
&  上面的代码可能看起来不是那么容易理解,下面我们一句一句解释:
  首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;
  接着是这句,这句要好好理解一下:
if (poolSize &= corePoolSize || !addIfUnderCorePoolSize(command))
&  由于是或条件运算符,所以先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。
  如果线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行
addIfUnderCorePoolSize(command)
  如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。
  如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:
if (runState == RUNNING && workQueue.offer(command))
&  如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:
addIfUnderMaximumPoolSize(command)
  如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。
  回到前面:
if (runState == RUNNING && workQueue.offer(command))
&  这句的执行,如果说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:
if (runState != RUNNING || poolSize == 0)
&  这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:
ensureQueuedTaskHandled(command)
&  进行应急处理,从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。
  我们接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t =
final ReentrantLock mainLock = this.mainL
mainLock.lock();
if (poolSize & corePoolSize && runState == RUNNING)
t = addThread(firstTask);
//创建线程去执行firstTask任务
} finally {
mainLock.unlock();
if (t == null)
t.start();
&  这个是addIfUnderCorePoolSize方法的具体实现,从名字可以看出它的意图就是当低于核心吃大小时执行的方法。下面看其具体实现,首先获取到锁,因为这地方涉及到线程池状态的变化,先通过if语句判断当前线程池中的线程数目是否小于核心池大小,有朋友也许会有疑问:前面在execute()方法中不是已经判断过了吗,只有线程池当前线程数目小于核心池大小才会执行addIfUnderCorePoolSize方法的,为何这地方还要继续判断?原因很简单,前面的判断过程中并没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用了shutdown或者shutdownNow方法。然后就是执行
t = addThread(firstTask);
&  这个方法也非常关键,传进去的参数为提交的任务,返回值为Thread类型。然后接着在下面判断t是否为空,为空则表明创建线程失败(即poolSize&=corePoolSize或者runState不等于RUNNING),否则调用t.start()方法启动线程。
  我们来看一下addThread方法的实现:
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
//创建一个线程,执行任务
if (t != null) {
w.thread =
//将创建的线程的引用赋值为w的成员变量
workers.add(w);
int nt = ++poolS
//当前线程数加1
if (nt & largestPoolSize)
largestPoolSize =
&  在addThread方法中,首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。
  下面我们看一下Worker类的实现:
private final class Worker implements Runnable {
private final ReentrantLock runLock = new ReentrantLock();
private Runnable firstT
volatile long completedT
Worker(Runnable firstTask) {
this.firstTask = firstT
boolean isActive() {
return runLock.isLocked();
void interruptIfIdle() {
final ReentrantLock runLock = this.runL
if (runLock.tryLock()) {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
void interruptNow() {
thread.interrupt();
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runL
runLock.lock();
if (runState & STOP &&
Thread.interrupted() &&
runState &= STOP)
boolean ran =
beforeExecute(thread, task);
//beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
//自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等
task.run();
afterExecute(task, null);
++completedT
} catch (RuntimeException ex) {
afterExecute(task, ex);
} finally {
runLock.unlock();
public void run() {
Runnable task = firstT
firstTask =
while (task != null || (task = getTask()) != null) {
runTask(task);
} finally {
workerDone(this);
//当任务队列中没有任务时,进行清理工作
&  它实际上实现了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本一样:
Thread t = new Thread(w);
&  相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。
  既然Worker实现了Runnable接口,那么自然最核心的方法便是run()方法了:
public void run() {
Runnable task = firstT
firstTask =
while (task != null || (task = getTask()) != null) {
runTask(task);
} finally {
workerDone(this);
&  从run方法的实现可以看出,它首先执行的是通过构造器传进来的任务firstTask,在调用runTask()执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里取呢?自然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现:
Runnable getTask() {
for (;;) {
int state = runS
if (state & SHUTDOWN)
if (state == SHUTDOWN)
// Help drain queue
r = workQueue.poll();
else if (poolSize & corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
//则通过poll取任务,若等待一定的时间取不到任务,则返回null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
r = workQueue.take();
if (r != null)
if (workerCanExit()) {
//如果没取到任务,即r为null,则判断当前的worker是否可以退出
if (runState &= SHUTDOWN) // Wake up others
interruptIdleWorkers();
//中断处于空闲状态的worker
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
&  在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。
  如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。
  如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。
  然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainL
mainLock.lock();
boolean canE
//如果runState大于等于STOP,或者任务缓存队列为空了
允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1
canExit = runState &= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize & Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
return canE
&  也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,我们看一下interruptIdleWorkers()的实现:
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainL
mainLock.lock();
for (Worker w : workers)
//实际上调用的是worker的interruptIfIdle()方法
w.interruptIfIdle();
} finally {
mainLock.unlock();
&  从实现可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:
void interruptIfIdle() {
final ReentrantLock runLock = this.runL
if (runLock.tryLock()) {
//注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的
//如果成功获取了锁,说明当前worker处于空闲状态
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
&&  这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。
&  我们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t =
final ReentrantLock mainLock = this.mainL
mainLock.lock();
if (poolSize & maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
if (t == null)
t.start();
&  看到没有,其实它和addIfUnderCorePoolSize方法的实现基本一模一样,只是if语句判断条件中的poolSize & maximumPoolSize不同而已。
  到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,下面总结一下:
  1)首先,要清楚corePoolSize和maximumPoolSize的含义;
  2)其次,要知道Worker是用来起到什么作用的;
  3)要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:
如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程数目&=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
3.线程池中的线程初始化
  默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
  在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化所有核心线程
  下面是这2个方法的实现:
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意传进去的参数是null
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
&  注意上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的
r = workQueue.take();
&  即等待任务队列中有任务。
4.任务缓存队列及排队策略
  在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
  workQueue的类型为BlockingQueue&Runnable&,通常可以取下面三种类型:
  1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
  2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
  3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
5.任务拒绝策略
  当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
6.线程池的关闭
  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
7.线程池容量的动态调整
  ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能创建的线程数目大小
  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
三.使用示例
  前面我们讨论了关于线程池的实现原理,这一节我们来看一下它的具体使用:
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue&Runnable&(5));
for(int i=0;i&15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
executor.shutdown();
class MyTask implements Runnable {
private int taskN
public MyTask(int num) {
this.taskNum =
public void run() {
System.out.println("正在执行task "+taskNum);
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("task "+taskNum+"执行完毕");
&  执行结果:
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 14
task 3执行完毕
task 0执行完毕
task 2执行完毕
task 1执行完毕
正在执行task 8
正在执行task 7
正在执行task 6
正在执行task 5
task 4执行完毕
task 10执行完毕
task 11执行完毕
task 13执行完毕
task 12执行完毕
正在执行task 9
task 14执行完毕
task 8执行完毕
task 5执行完毕
task 7执行完毕
task 6执行完毕
task 9执行完毕
  从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。
  不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
Executors.newCachedThreadPool();
//创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();
//创建容量为1的缓冲池
Executors.newFixedThreadPool(int);
//创建固定容量大小的缓冲池
&  下面是这三个静态方法的具体实现;
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue&Runnable&());
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue&Runnable&()));
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue&Runnable&());
  从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。
  newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
  newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
  newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
  实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。
  另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。
四.如何合理配置线程池的大小
  本节来讨论一个比较重要的话题:如何合理配置线程池大小,仅供参考。
  一般需要根据任务的类型来配置线程池大小:
  如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
  如果是IO密集型任务,参考值可以设置为2*NCPU
  当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
  参考资料:
  《JDK API 1.6》
阅读(...) 评论()

我要回帖

更多关于 无法验证介质空余 的文章

 

随机推荐