Java线程池

以下内容为阅读《JAVA并发编程的艺术》学习总结 笔记

简介

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序 都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

线程池的优点

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不久会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

线程池的实现原理

线程池的创建

我们可以通过ThreadPoolExecutor构造方法生成一个线程池对象,移除了内部代码,只留下了构造参数,便于理解,代码如下

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

}

参数对应关系

参数 含义
corePoolSize 核心线程数
maximumPoolSize 最大线程数
keepAliveTime 线程活动保持时间
TimeUnit 线程活动保持时间的单位
BlockingQueue 任务队列
ThreadFactory 线程工厂
RejectedExecutionHandler 拒绝策略

大致明白参数意义后,我们说一下线程池新建一个线程的过程

先看一下线程池的执行示意图

  1. 主线程执行execute()提交一个新线程,首先进入核心线程池corePoolSize并判断是否已满,如果未满则新建一个线程(注意,执行这一步骤 需要获取全局锁)。
  2. 如果核心线程池已满,则进入阻塞队列BlockingQueue进行判断是否已满,如果未满则插入队列(队列保持先进先出的顺序执行 )
  3. 如果阻塞队列已满,则判断线程池是否达到最大线程数量maximumPoolSize,如果没有,则新建线程(注意,执行这一步骤 需要获取全局锁)。
  4. 如果达到最大线程数量,则执行拒绝策略RejectedExecutionHandler

如果将上述描述转化成流程图则为

源码分析

上面的流程分析让我们很直观地了解了线程池的工作原理,让我们再通过源代 码来看看是如何实现的,线程池执行任务的方法如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();

// 是否小于核心线程数,小于执行当前线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 否则尝试加入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 最后尝试加入最大线程中
else if (!addWorker(command, false))
reject(command);
}

工作线程

线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列中的任务来执行,我们可以从Worker类的run()方法里看到这点。

只关注while循环和内部task.run即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 二次验证线程是否中断或停止
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();

// 线程执行
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}


}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

ThreadPoolExecutor中线程执行任务的示意图如下

线程池中得先成功执行任务份两种情况

  • 在execute()方法中创建一个线程时,会让这个线程执行当前任务
  • 这个线程执行完上图中1得任务后,会反复从BlockingQueue获取任务来执行

线程池参数

corePoolSize(核心线程数)

当提交一个任务到线程池中时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等待需要执行的任务数大于线程池基本大小(核心线程数)时就不在创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

runnableTaskQueue(任务队列)

用于保存等待执行的任务的阻塞队列。有以下几种

  • ArrayBlockingQueue: 一个基于数组结构的有界阻塞队列,按照FIFO(first in first out)先进先出的原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,也是按照FIFO 原则排序元素,吞吐量通常高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列
  • ProirityBlockingQueue:一个具有优先级的无限阻塞队列
maximumPoolSize(最大线程数)

线程池允许创建的最大线程数。如果核心线程和队列都已经满了,并且已创建的线程数小于最大线程数,也就是上图中3这种情况,后续提交的线程则会创建新线程,当使用无界的任务队列时,提交的线程会无限阻塞在任务队列中,这个参数就失去了意义。

ThreadFactory(线程工厂)

ThreadFactory 是一个接口,用于创建线程的工厂。它提供了一种创建和定制线程的方式,让你能够对线程进行更多的控制和管理。例如自定义线程名称、设置线程优先级、设置线程组等。

RejectedExecutionHandler(拒绝策略)

当队列和线程池都满了,说明明线程池处于饱和状态,那么必须才需一种策略处理提交的新任务。默认请情况下使用的时AbortPolicy,表示无法处理新任务时抛出异常。自JDK1.5 Java线程池提供了4中策略

  • AbortPolicy 直接抛出异常
  • CallerRunsPolicy 只用调用者所在的线程运行任务
  • DiscardOldestPolicy 丢弃队列里最旧的任务也就是最早添加的一个任务,并尝试执行当前任务
  • DiscardPolicy 不处理 直接丢弃

除此之外,我们可以根据业务场景自己实现RejectedExecutionHandler接口自定义策略,比如记录日志或持久化不能处理的任务

keepAliveTime(存活时间)

线程池对的工作线程空闲后,保持存活的时间,线程在存活时间内可被重复利用,如果我们执行任务的时间比较短,调大存活时间可以提高线程利用率,避免线程的重新启动和终止所产生的额外时间消耗

TimeUnit(存活时间单位)
1
java.util.concurrent.TimeUnit

可选

  • SECONDS 秒
  • DAYS 天
  • HOURS 小时
  • MICROSECONDS 微秒
  • MILLISECONDS 毫秒
  • MINUTES 分钟
  • NANOSECONDS 纳秒

提交线程任务

线程池提交任务使用execute()方法和submit()方法,区别在于execute()方法无返回值,submit()方法可以获取线程执行返回值,它返回一个Future接口对象,可以使用future.get()阻塞线程获取返回值

execute()

构造方法

1
public void execute(Runnable command)

可以看到execute()接收一个Runnable的实例对象,也无返回值,所以无法判断任务是否是否被线程池执行成功(如果线程没有打印或日志的情况下),一个小例子如下

1
2
3
4
5
6
public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(()-> System.out.println("提交新线程成功"));
executorService.shutdown();
}

结果不言而喻

1
2
3
提交新线程成功

Process finished with exit code 0
submit()

构造方法

1
2
3
4
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);

可以看到submit()方法返回一个Future接口对象,我们可以通过Future的get()方法获取返回值,当然get()方法会阻塞当前线程

一个小例子

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<String> submit = executorService.submit(() -> "提交新线程成功");
try {
System.out.println(submit.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executorService.shutdown();
}

结果

1
2
3
提交新线程成功

Process finished with exit code 0

关闭线程池

关闭线程池可以使用shutdown或shutdownNow方法,他们的原理是遍历线程池的工作线程,然后逐个调用线程的interrupt方法来中断线程。

区别在于shutdownNow会先将线程池的状态设置成stop,然后尝试停止所有正在执行或暂停的任务线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行的任务线程

只要调用这两个方法中的任意一个,isShutdown方法就会返回ture,当所有任务都已经关闭后,才表示线程池关闭成功,这个时候调用idTerminaed方法会返回true。通常使用shutdown方法关闭线程。

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低。 ·任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的 线程,如配置**Ncpu+1**个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配 置尽可能多的线程,如**2*Ncpu**。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 **Runtime.getRuntime().availableProcessors()**方法获得当前设备的CPU个数。

  • 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高 的任务先执行。
  • 执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让 执行时间短的任务先执行。
  • 依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越 长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。

PS:

*** 如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能 执行。***

​ *建议使用有界队列,有界队列能增加系统的稳定性和预警能力。

线程池的监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。

可以通过线程池提供的参数进行监控,在监控线程池的 时候可以使用以下属性。

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount
  • largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。
  • getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecuteafterExecuteterminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。 这几个方法在线程池里是空方法。