跳到主要内容

线程池

线程池是并发编程中最重要的组件之一。它通过复用线程、控制并发数量、管理任务队列,有效解决了频繁创建销毁线程带来的性能问题。本章将深入讲解线程池的原理和使用方法。

为什么需要线程池?

线程创建的开销

每个线程的创建都需要分配栈空间(默认 1MB)、向操作系统注册线程、进行上下文切换等操作。如果每个任务都创建新线程,在高并发场景下会带来巨大的性能开销。

资源耗尽风险

无限制地创建线程可能导致系统资源耗尽。当线程数量超过系统承载能力时,系统响应变慢甚至崩溃。

线程管理的复杂性

手动管理线程的生命周期、等待线程完成、处理异常等操作非常繁琐,容易出错。

线程池通过以下方式解决这些问题:

  • 线程复用:线程执行完任务后不销毁,继续执行其他任务
  • 控制并发:限制同时运行的线程数量,防止资源耗尽
  • 任务队列:缓存待执行的任务,平滑处理请求峰值
  • 统一管理:提供任务提交、执行、关闭等统一接口

Executor 框架

JUC 的线程池基于 Executor 框架,其核心接口层次如下:

Executor (顶层接口)
└── ExecutorService (扩展接口,支持任务提交和关闭)
├── ThreadPoolExecutor (标准线程池实现)
└── ScheduledThreadPoolExecutor (支持定时任务)

Executor 接口

public interface Executor {
void execute(Runnable command);
}

Executor 是最基础的接口,只定义了一个 execute 方法用于执行任务。

ExecutorService 接口

public interface ExecutorService extends Executor {
void shutdown(); // 平滑关闭
List<Runnable> shutdownNow(); // 立即关闭
boolean isShutdown(); // 是否已关闭
boolean isTerminated(); // 是否已终止
<T> Future<T> submit(Callable<T> task); // 提交有返回值的任务
Future<?> submit(Runnable task); // 提交无返回值的任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); // 批量执行
}

ExecutorService 扩展了 Executor,提供了更丰富的功能。

ThreadPoolExecutor 核心参数

ThreadPoolExecutor 是线程池的核心实现类,理解其构造参数是正确使用线程池的关键。

public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)

参数详解

corePoolSize - 核心线程数

核心线程是线程池中长期保持的线程数量,即使空闲也不会被回收(除非设置了 allowCoreThreadTimeOut)。

当提交新任务时,如果当前线程数小于 corePoolSize,会创建新线程执行任务,即使其他核心线程处于空闲状态。

maximumPoolSize - 最大线程数

线程池允许创建的最大线程数量。当任务队列已满且当前线程数小于 maximumPoolSize 时,会创建非核心线程处理任务。

keepAliveTime - 空闲线程存活时间

非核心线程空闲超过这个时间后会被回收。如果设置了 allowCoreThreadTimeOut(true),核心线程也会被回收。

workQueue - 任务队列

用于缓存等待执行的任务。常见的队列类型:

队列类型特点适用场景
ArrayBlockingQueue有界队列,需指定容量资源有限,防止任务堆积
LinkedBlockingQueue可选有界,默认无界任务量可控的场景
SynchronousQueue不存储,直接传递高吞吐,任务处理快
PriorityBlockingQueue无界优先级队列任务有优先级需求

threadFactory - 线程工厂

用于创建新线程,可以自定义线程名称、优先级、是否守护线程等。

ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setName("my-pool-" + t.getId());
t.setDaemon(false);
return t;
};

handler - 拒绝策略

当线程池已满(达到 maximumPoolSize 且队列已满)时,新任务的处理策略:

策略行为适用场景
AbortPolicy抛出 RejectedExecutionException默认策略,快速失败
CallerRunsPolicy由调用者线程执行任务降级处理,减缓提交速度
DiscardPolicy直接丢弃任务,不抛异常允许任务丢失
DiscardOldestPolicy丢弃队列中最老的任务优先处理新任务

任务执行流程

理解线程池的任务执行流程对于正确配置参数至关重要:

提交任务


当前线程数 < corePoolSize?

├─ 是 ──→ 创建核心线程执行任务

└─ 否


任务队列未满?

├─ 是 ──→ 任务加入队列等待

└─ 否


当前线程数 < maximumPoolSize?

├─ 是 ──→ 创建非核心线程执行任务

└─ 否 ──→ 执行拒绝策略

实例演示

import java.util.concurrent.*;

public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(10), // 任务队列容量10
Executors.defaultThreadFactory(), // 默认线程工厂
new ThreadPoolExecutor.AbortPolicy() // 默认拒绝策略
);

for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()
+ " 执行任务 " + taskId);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

System.out.println("任务 " + taskId + " 已提交," +
"核心线程: " + executor.getCorePoolSize() +
", 最大线程: " + executor.getMaximumPoolSize() +
", 当前线程: " + executor.getPoolSize() +
", 队列大小: " + executor.getQueue().size());
}

executor.shutdown();
}
}

Executors 工厂方法

Executors 类提供了创建常用线程池的静态工厂方法,但这些方法在生产环境中需要谨慎使用。

newFixedThreadPool

ExecutorService executor = Executors.newFixedThreadPool(4);

创建固定线程数的线程池,核心线程数和最大线程数相同,使用无界队列。

问题:使用 LinkedBlockingQueue(无界),任务堆积可能导致内存溢出。

newCachedThreadPool

ExecutorService executor = Executors.newCachedThreadPool();

创建可缓存的线程池,核心线程数为 0,最大线程数为 Integer.MAX_VALUE,使用 SynchronousQueue。

问题:线程数无上限,高并发时可能创建大量线程导致系统崩溃。

newSingleThreadExecutor

ExecutorService executor = Executors.newSingleThreadExecutor();

创建单线程的线程池,保证任务顺序执行。

问题:使用无界队列,任务堆积可能导致内存溢出。

newScheduledThreadPool

ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);

executor.schedule(() -> System.out.println("延迟3秒执行"), 3, TimeUnit.SECONDS);

executor.scheduleAtFixedRate(() -> System.out.println("每2秒执行一次"),
0, 2, TimeUnit.SECONDS);

executor.scheduleWithFixedDelay(() -> System.out.println("上次执行完2秒后执行"),
0, 2, TimeUnit.SECONDS);

支持定时和周期性任务执行。

问题:最大线程数无限制。

阿里巴巴规范

阿里巴巴 Java 开发手册明确禁止使用 Executors 创建线程池:

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

推荐做法:根据业务需求,手动配置 ThreadPoolExecutor 的各个参数。

线程池监控

ThreadPoolExecutor 提供了多个方法用于监控线程池状态:

public class ThreadPoolMonitor {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10)
);

for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

while (!executor.isTerminated()) {
System.out.println("=== 线程池状态 ===");
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("活动线程数: " + executor.getActiveCount());
System.out.println("已完成任务: " + executor.getCompletedTaskCount());
System.out.println("队列大小: " + executor.getQueue().size());
System.out.println("是否关闭: " + executor.isShutdown());
System.out.println("是否终止: " + executor.isTerminated());

Thread.sleep(1000);
}
}
}

线程池关闭

正确关闭线程池是避免资源泄漏的关键。

shutdown - 平滑关闭

executor.shutdown();

if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}

shutdown 方法会:

  1. 不再接受新任务
  2. 等待已提交的任务执行完成
  3. 线程池状态变为 SHUTDOWN

shutdownNow - 立即关闭

List<Runnable> unfinishedTasks = executor.shutdownNow();

shutdownNow 方法会:

  1. 不再接受新任务
  2. 尝试中断正在执行的任务
  3. 返回队列中等待的任务
  4. 线程池状态变为 STOP

最佳实践

public class GracefulShutdown {
private static final ExecutorService executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);

public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("开始关闭线程池...");
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
System.out.println("强制关闭线程池");
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
System.out.println("线程池已关闭");
}));

for (int i = 0; i < 20; i++) {
final int id = i;
executor.execute(() -> {
System.out.println("任务 " + id + " 执行中");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("任务 " + id + " 被中断");
Thread.currentThread().interrupt();
}
});
}
}
}

线程池配置建议

核心线程数设置

CPU 密集型任务

int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;

CPU 密集型任务主要消耗 CPU 资源,线程数过多会导致频繁上下文切换,反而降低性能。通常设置为 CPU 核心数 + 1。

IO 密集型任务

int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;

IO 密集型任务大部分时间在等待 IO,可以配置更多线程。通常设置为 CPU 核心数的 2 倍,具体需要根据 IO 等待时间调整。

混合型任务

int corePoolSize = (int) (Runtime.getRuntime().availableProcessors() 
* (1 + waitTime / computeTime));

waitTime 是 IO 等待时间,computeTime 是 CPU 计算时间。

队列选择

  • 有界队列:推荐使用 ArrayBlockingQueue,防止任务堆积导致内存溢出
  • 任务处理快:使用 SynchronousQueue,任务直接传递给线程执行
  • 任务有优先级:使用 PriorityBlockingQueue

拒绝策略选择

  • AbortPolicy:默认策略,适合需要快速发现问题的场景
  • CallerRunsPolicy:适合可以降级处理的场景,减缓任务提交速度
  • 自定义策略:记录日志、持久化任务、发送告警等

线程池扩展

ThreadPoolExecutor 提供了几个可扩展的方法:

public class CustomThreadPool extends ThreadPoolExecutor {

public CustomThreadPool(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("任务执行前: " + t.getName());
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("任务执行后");
if (t != null) {
System.out.println("任务异常: " + t.getMessage());
}
}

@Override
protected void terminated() {
System.out.println("线程池已终止");
}
}

这些钩子方法可以用于:

  • 任务执行前后的日志记录
  • 统计任务执行时间
  • 异常处理和告警
  • 资源清理

小结

本章详细介绍了线程池的核心概念:

  1. 线程池的优势:线程复用、控制并发、任务队列、统一管理
  2. Executor 框架:Executor、ExecutorService、ThreadPoolExecutor 的层次结构
  3. 核心参数:corePoolSize、maximumPoolSize、keepAliveTime、workQueue、threadFactory、handler
  4. 任务执行流程:从核心线程到队列再到非核心线程的执行逻辑
  5. Executors 工厂方法:了解其问题,推荐手动配置
  6. 线程池监控与关闭:正确监控和优雅关闭线程池
  7. 配置建议:根据任务类型合理配置参数

线程池是并发编程的基础设施,正确理解和使用线程池对于构建高性能并发应用至关重要。