线程池
线程池是并发编程中最重要的组件之一。它通过复用线程、控制并发数量、管理任务队列,有效解决了频繁创建销毁线程带来的性能问题。本章将深入讲解线程池的原理和使用方法。
为什么需要线程池?
线程创建的开销
每个线程的创建都需要分配栈空间(默认 1MB)、向操作系统注册线程、进行上下文切换等操作。如果每个任务都创建新线程,在高并发场景下会带来巨大的性能开销。
资源耗尽风险
无限制地创建线程可能导致系统资源耗尽。当线程数量超过系统承载能力时,系统响应变慢甚至崩溃。
线程管理的复杂性
手动管理线程的生命周期、等待线程完成、处理异常等操作非常繁琐,容易出错。
线程池通过以下方式解决这些问题:
- 线程复用:线程执行完任务后不销毁,继续执行其他任务
- 控制并发:限制同时运行的线程数量,防止资源耗尽
- 任务队列:缓存待执行的任务,平滑处理请求峰值
- 统一管理:提供任务提交、执行、关闭等统一接口
Executor 框架
JUC 的线程池基于 Executor 框架,其核心接口层次如下:
Executor (顶层接口)
└── ExecutorService (扩展接口,支持任务提交和关闭)
├── ThreadPoolExecutor (标准线程池实现)
└── ScheduledThreadPoolExecutor (支持定时任务)
Executor 接口
public interface Executor {
void execute(Runnable command);
}
Executor 是最基础的接口,只定义了一个 execute 方法用于执行任务。
ExecutorService 接口
public interface ExecutorService extends Executor {
void shutdown(); // 平滑关闭
List<Runnable> shutdownNow(); // 立即关闭
boolean isShutdown(); // 是否已关闭
boolean isTerminated(); // 是否已终止
<T> Future<T> submit(Callable<T> task); // 提交有返回值的任务
Future<?> submit(Runnable task); // 提交无返回值的任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); // 批量执行
}
ExecutorService 扩展了 Executor,提供了更丰富的功能。
ThreadPoolExecutor 核心参数
ThreadPoolExecutor 是线程池的核心实现类,理解其构造参数是正确使用线程池的关键。
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
参数详解
corePoolSize - 核心线程数
核心线程是线程池中长期保持的线程数量,即使空闲也不会被回收(除非设置了 allowCoreThreadTimeOut)。
当提交新任务时,如果当前线程数小于 corePoolSize,会创建新线程执行任务,即使其他核心线程处于空闲状态。
maximumPoolSize - 最大线程数
线程池允许创建的最大线程数量。当任务队列已满且当前线程数小于 maximumPoolSize 时,会创建非核心线程处理任务。
keepAliveTime - 空闲线程存活时间
非核心线程空闲超过这个时间后会被回收。如果设置了 allowCoreThreadTimeOut(true),核心线程也会被回收。
workQueue - 任务队列
用于缓存等待执行的任务。常见的队列类型:
| 队列类型 | 特点 | 适用场景 |
|---|---|---|
| ArrayBlockingQueue | 有界队列,需指定容量 | 资源有限,防止任务堆积 |
| LinkedBlockingQueue | 可选有界,默认无界 | 任务量可控的场景 |
| SynchronousQueue | 不存储,直接传递 | 高吞吐,任务处理快 |
| PriorityBlockingQueue | 无界优先级队列 | 任务有优先级需求 |
threadFactory - 线程工厂
用于创建新线程,可以自定义线程名称、优先级、是否守护线程等。
ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setName("my-pool-" + t.getId());
t.setDaemon(false);
return t;
};
handler - 拒绝策略
当线程池已满(达到 maximumPoolSize 且队列已满)时,新任务的处理策略:
| 策略 | 行为 | 适用场景 |
|---|---|---|
| AbortPolicy | 抛出 RejectedExecutionException | 默认策略,快速失败 |
| CallerRunsPolicy | 由调用者线程执行任务 | 降级处理,减缓提交速度 |
| DiscardPolicy | 直接丢弃任务,不抛异常 | 允许任务丢失 |
| DiscardOldestPolicy | 丢弃队列中最老的任务 | 优先处理新任务 |
任务执行流程
理解线程池的任务执行流程对于正确配置参数至关重要:
提交任务
│
▼
当前线程数 < corePoolSize?
│
├─ 是 ──→ 创建核心线程执行任务
│
└─ 否
│
▼
任务队列未满?
│
├─ 是 ──→ 任务加入队列等待
│
└─ 否
│
▼
当前线程数 < maximumPoolSize?
│
├─ 是 ──→ 创建非核心线程执行任务
│
└─ 否 ──→ 执行拒绝策略
实例演示
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(10), // 任务队列容量10
Executors.defaultThreadFactory(), // 默认线程工厂
new ThreadPoolExecutor.AbortPolicy() // 默认拒绝策略
);
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()
+ " 执行任务 " + taskId);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("任务 " + taskId + " 已提交," +
"核心线程: " + executor.getCorePoolSize() +
", 最大线程: " + executor.getMaximumPoolSize() +
", 当前线程: " + executor.getPoolSize() +
", 队列大小: " + executor.getQueue().size());
}
executor.shutdown();
}
}
Executors 工厂方法
Executors 类提供了创建常用线程池的静态工厂方法,但这些方法在生产环境中需要谨慎使用。
newFixedThreadPool
ExecutorService executor = Executors.newFixedThreadPool(4);
创建固定线程数的线程池,核心线程数和最大线程数相同,使用无界队列。
问题:使用 LinkedBlockingQueue(无界),任务堆积可能导致内存溢出。
newCachedThreadPool
ExecutorService executor = Executors.newCachedThreadPool();
创建可缓存的线程池,核心线程数为 0,最大线程数为 Integer.MAX_VALUE,使用 SynchronousQueue。
问题:线程数无上限,高并发时可能创建大量线程导致系统崩溃。
newSingleThreadExecutor
ExecutorService executor = Executors.newSingleThreadExecutor();
创建单线程的线程池,保证任务顺序执行。
问题:使用无界队列,任务堆积可能导致内存溢出。
newScheduledThreadPool
ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
executor.schedule(() -> System.out.println("延迟3秒执行"), 3, TimeUnit.SECONDS);
executor.scheduleAtFixedRate(() -> System.out.println("每2秒执行一次"),
0, 2, TimeUnit.SECONDS);
executor.scheduleWithFixedDelay(() -> System.out.println("上次执行完2秒后执行"),
0, 2, TimeUnit.SECONDS);
支持定时和周期性任务执行。
问题:最大线程数无限制。
阿里巴巴规范
阿里巴巴 Java 开发手册明确禁止使用 Executors 创建线程池:
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
推荐做法:根据业务需求,手动配置 ThreadPoolExecutor 的各个参数。
线程池监控
ThreadPoolExecutor 提供了多个方法用于监控线程池状态:
public class ThreadPoolMonitor {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10)
);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
while (!executor.isTerminated()) {
System.out.println("=== 线程池状态 ===");
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("活动线程数: " + executor.getActiveCount());
System.out.println("已完成任务: " + executor.getCompletedTaskCount());
System.out.println("队列大小: " + executor.getQueue().size());
System.out.println("是否关闭: " + executor.isShutdown());
System.out.println("是否终止: " + executor.isTerminated());
Thread.sleep(1000);
}
}
}
线程池关闭
正确关闭线程池是避免资源泄漏的关键。
shutdown - 平滑关闭
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
shutdown 方法会:
- 不再接受新任务
- 等待已提交的任务执行完成
- 线程池状态变为 SHUTDOWN
shutdownNow - 立即关闭
List<Runnable> unfinishedTasks = executor.shutdownNow();
shutdownNow 方法会:
- 不再接受新任务
- 尝试中断正在执行的任务
- 返回队列中等待的任务
- 线程池状态变为 STOP
最佳实践
public class GracefulShutdown {
private static final ExecutorService executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("开始关闭线程池...");
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
System.out.println("强制关闭线程池");
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
System.out.println("线程池已关闭");
}));
for (int i = 0; i < 20; i++) {
final int id = i;
executor.execute(() -> {
System.out.println("任务 " + id + " 执行中");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("任务 " + id + " 被中断");
Thread.currentThread().interrupt();
}
});
}
}
}
线程池配置建议
核心线程数设置
CPU 密集型任务
int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;
CPU 密集型任务主要消耗 CPU 资源,线程数过多会导致频繁上下文切换,反而降低性能。通常设置为 CPU 核心数 + 1。
IO 密集型任务
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
IO 密集型任务大部分时间在等待 IO,可以配置更多线程。通常设置为 CPU 核心数的 2 倍,具体需要根据 IO 等待时间调整。
混合型任务
int corePoolSize = (int) (Runtime.getRuntime().availableProcessors()
* (1 + waitTime / computeTime));
waitTime 是 IO 等待时间,computeTime 是 CPU 计算时间。
队列选择
- 有界队列:推荐使用 ArrayBlockingQueue,防止任务堆积导致内存溢出
- 任务处理快:使用 SynchronousQueue,任务直接传递给线程执行
- 任务有优先级:使用 PriorityBlockingQueue
拒绝策略选择
- AbortPolicy:默认策略,适合需要快速发现问题的场景
- CallerRunsPolicy:适合可以降级处理的场景,减缓任务提交速度
- 自定义策略:记录日志、持久化任务、发送告警等
线程池扩展
ThreadPoolExecutor 提供了几个可扩展的方法:
public class CustomThreadPool extends ThreadPoolExecutor {
public CustomThreadPool(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("任务执行前: " + t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("任务执行后");
if (t != null) {
System.out.println("任务异常: " + t.getMessage());
}
}
@Override
protected void terminated() {
System.out.println("线程池已终止");
}
}
这些钩子方法可以用于:
- 任务执行前后的日志记录
- 统计任务执行时间
- 异常处理和告警
- 资源清理
小结
本章详细介绍了线程池的核心概念:
- 线程池的优势:线程复用、控制并发、任务队列、统一管理
- Executor 框架:Executor、ExecutorService、ThreadPoolExecutor 的层次结构
- 核心参数:corePoolSize、maximumPoolSize、keepAliveTime、workQueue、threadFactory、handler
- 任务执行流程:从核心线程到队列再到非核心线程的执行逻辑
- Executors 工厂方法:了解其问题,推荐手动配置
- 线程池监控与关闭:正确监控和优雅关闭线程池
- 配置建议:根据任务类型合理配置参数
线程池是并发编程的基础设施,正确理解和使用线程池对于构建高性能并发应用至关重要。