Java 多线程编程
Java 提供了强大的多线程支持,使得程序可以同时执行多个任务。本章将详细介绍 Java 多线程编程的核心概念和技术。
线程基础
什么是线程?
线程是程序执行的最小单元,一个进程可以包含多个线程,每个线程独立执行。
创建线程的方式
1. 继承 Thread 类
class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println("线程执行: " + i);
try {
sleep(100); // 休眠100毫秒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 使用
public class Main {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动线程
// 主线程继续执行
for (int i = 0; i < 5; i++) {
System.out.println("主线程: " + i);
}
}
}
2. 实现 Runnable 接口
class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println("Runnable 线程: " + i);
}
}
}
// 使用
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
// 简化为 Lambda 表达式(Java 8+)
new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Lambda 线程: " + i);
}
}).start();
3. 实现 Callable 接口
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
}
// 使用
public class Main {
public static void main(String[] args) throws Exception {
Callable<Integer> callable = new MyCallable();
FutureTask<Integer> task = new FutureTask<>(callable);
Thread thread = new Thread(task);
thread.start();
// 获取返回值
Integer result = task.get();
System.out.println("计算结果: " + result);
}
}
线程生命周期
NEW(新建)→ RUNNABLE(就绪)→ RUNNING(运行)→ BLOCKED/WAITING(阻塞)→ TERMINATED(终止)
// 获取线程状态
Thread thread = new Thread(() -> System.out.println("运行中"));
System.out.println(thread.getState()); // NEW
thread.start();
System.out.println(thread.getState()); // RUNNABLE
thread.join();
System.out.println(thread.getState()); // TERMINATED
线程控制
sleep() 方法
使当前线程休眠一段时间:
try {
Thread.sleep(1000); // 休眠1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
join() 方法
等待线程终止:
Thread t1 = new Thread(() -> {
System.out.println("线程1开始");
try { Thread.sleep(1000); } catch (Exception e) {}
System.out.println("线程1结束");
});
Thread t2 = new Thread(() -> {
System.out.println("线程2开始");
try { Thread.sleep(500); } catch (Exception e) {}
System.out.println("线程2结束");
});
t1.start();
t2.start();
t1.join(); // 等待t1结束
t2.join(); // 等待t2结束
System.out.println("所有线程执行完毕");
yield() 方法
暂停当前线程,让出 CPU 时间:
Thread.yield(); // 提示调度器可以让出CPU
interrupt() 方法
中断线程:
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("运行中...");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// 收到中断信号,退出循环
break;
}
}
});
thread.start();
Thread.sleep(1000);
thread.interrupt(); // 中断线程
线程同步
synchronized 关键字
方法同步
class Counter {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
代码块同步
class Counter {
private int count = 0;
private final Object lock = new Object();
public void increment() {
synchronized (lock) {
count++;
}
}
// 同步特定对象
public void incrementBy(Object obj) {
synchronized (obj) {
count++;
}
}
}
示例:生产者-消费者问题
import java.util.LinkedList;
class ProducerConsumer {
private LinkedList<Integer> buffer = new LinkedList<>();
private final int MAX_SIZE = 10;
public synchronized void produce(int value) throws InterruptedException {
while (buffer.size() >= MAX_SIZE) {
wait(); // 等待消费者消费
}
buffer.add(value);
System.out.println("生产: " + value);
notifyAll(); // 通知消费者
}
public synchronized int consume() throws InterruptedException {
while (buffer.isEmpty()) {
wait(); // 等待生产者生产
}
int value = buffer.removeFirst();
System.out.println("消费: " + value);
notifyAll(); // 通知生产者
return value;
}
}
Lock 接口
Java 5 引入了 Lock 接口,提供比 synchronized 更灵活的锁操作。
ReentrantLock
import java.util.concurrent.locks.ReentrantLock;
class Counter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock(); // 必须在 finally 中释放
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
ReentrantReadWriteLock
读写锁,允许多个读操作同时进行:
import java.util.concurrent.locks.ReentrantReadWriteLock;
class DataContainer {
private String data;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public String read() {
rwLock.readLock().lock();
try {
Thread.sleep(100); // 模拟读取
return data;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
rwLock.readLock().unlock();
}
}
public void write(String data) {
rwLock.writeLock().lock();
try {
Thread.sleep(100); // 模拟写入
this.data = data;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
rwLock.writeLock().unlock();
}
}
}
线程通信
wait() / notify() / notifyAll()
class SharedResource {
private Object lock = new Object();
private boolean ready = false;
public void waitForReady() throws InterruptedException {
synchronized (lock) {
while (!ready) {
lock.wait(); // 等待
}
System.out.println("继续执行");
}
}
public void setReady() {
synchronized (lock) {
ready = true;
lock.notifyAll(); // 通知所有等待线程
}
}
}
Condition 接口
配合 Lock 使用:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class BlockingQueue<T> {
private Object[] items;
private int count = 0;
private int takeIndex = 0;
private int putIndex = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public BlockingQueue(int capacity) {
items = new Object[capacity];
}
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // 等待不为空
}
@SuppressWarnings("unchecked")
T item = (T) items[takeIndex];
items[takeIndex] = null;
takeIndex = (takeIndex + 1) % items.length;
count--;
notFull.signal(); // 通知不满
return item;
} finally {
lock.unlock();
}
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await(); // 等待不满
}
items[putIndex] = item;
putIndex = (putIndex + 1) % items.length;
count++;
notEmpty.signal(); // 通知不空
} finally {
lock.unlock();
}
}
}
线程池
Executor 框架
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
// 创建固定数量的线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
// 提交任务
executor.execute(() -> System.out.println("任务1"));
executor.submit(() -> {
// 执行任务
System.out.println("任务2");
});
// 提交有返回值任务
Future<String> future = executor.submit(() -> "结果");
String result = future.get();
// 关闭线程池
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
线程池类型
| 类型 | 说明 |
|---|---|
newFixedThreadPool(n) | 固定数量的线程池 |
newCachedThreadPool() | 可缓存的线程池,自动调整 |
newSingleThreadExecutor() | 单线程的线程池 |
newScheduledThreadPool(n) | 定时调度的线程池 |
自定义线程池
import java.util.concurrent.*;
ExecutorService executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 任务队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
// 拒绝策略
// AbortPolicy - 抛出RejectedExecutionException
// CallerRunsPolicy - 调用者线程执行
// DiscardPolicy - 丢弃任务
// DiscardOldestPolicy - 丢弃最老的任务
并发工具类
CountDownLatch
等待一组线程完成:
import java.util.concurrent.CountDownLatch;
class Worker implements Runnable {
private final CountDownLatch latch;
private final int id;
public Worker(CountDownLatch latch, int id) {
this.latch = latch;
this.id = id;
}
@Override
public void run() {
System.out.println("Worker " + id + " 开始工作");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker " + id + " 完成");
latch.countDown(); // 计数减1
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3); // 3个worker
for (int i = 0; i < 3; i++) {
new Thread(new Worker(latch, i)).start();
}
latch.await(); // 等待所有worker完成
System.out.println("所有工作完成");
}
}
CyclicBarrier
让一组线程相互等待:
import java.util.concurrent.CyclicBarrier;
class GamePlayer implements Runnable {
private final CyclicBarrier barrier;
private final int id;
public GamePlayer(CyclicBarrier barrier, int id) {
this.barrier = barrier;
this.id = id;
}
@Override
public void run() {
System.out.println("玩家 " + id + " 准备中...");
try {
barrier.await(); // 等待其他玩家
System.out.println("玩家 " + id + " 开始游戏");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () ->
System.out.println("游戏开始!")
);
for (int i = 0; i < 3; i++) {
new Thread(new GamePlayer(barrier, i)).start();
}
}
}
Semaphore
控制并发数量:
import java.util.concurrent.Semaphore;
class Pool {
private final Semaphore semaphore;
private final boolean[] connections;
public Pool(int size) {
semaphore = new Semaphore(size);
connections = new boolean[size];
}
public void connect() throws InterruptedException {
semaphore.acquire();
int id = -1;
synchronized (this) {
for (int i = 0; i < connections.length; i++) {
if (!connections[i]) {
connections[i] = true;
id = i;
break;
}
}
}
System.out.println("获取连接: " + id);
// 使用连接
Thread.sleep((long) (Math.random() * 1000));
synchronized (this) {
connections[id] = false;
}
semaphore.release();
System.out.println("释放连接: " + id);
}
}
Exchanger
两个线程交换数据:
import java.util.concurrent.Exchanger;
class Producer implements Runnable {
private final Exchanger<String> exchanger;
private String data = "产品A";
public Producer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
for (int i = 0; i < 3; i++) {
System.out.println("生产者发送: " + data);
data = exchanger.exchange(data);
System.out.println("生产者收到: " + data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private final Exchanger<String> exchanger;
private String data = "产品B";
public Consumer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
for (int i = 0; i < 3; i++) {
System.out.println("消费者发送: " + data);
data = exchanger.exchange(data);
System.out.println("消费者收到: " + data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
原子类
java.util.concurrent.atomic 包提供了无锁的线程安全操作:
import java.util.concurrent.atomic.*;
// 原子整数
AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet(); // count++ 返回新值
count.getAndIncrement(); // 返回旧值然后++
count.addAndGet(5); // 加5
count.compareAndSet(0, 1); // CAS操作
// 原子长整型
AtomicLong atomicLong = new AtomicLong();
// 原子引用
AtomicReference<Person> atomicRef = new AtomicReference<>(new Person());
atomicRef.compareAndSet(person1, person2);
atomicRef.updateAndGet(p -> new Person(p.getName(), p.getAge() + 1));
// 原子数组
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.getAndSet(0, 5);
// 原子字段更新器
AtomicIntegerFieldUpdater<Person> updater =
AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
updater.incrementAndGet(person);
线程安全集合
import java.util.concurrent.*;
// 线程安全的 List
List<String> list = new CopyOnWriteArrayList<>();
list.add("a");
list.remove("a");
// 线程安全的 Set
Set<String> set = new CopyOnWriteArraySet<>();
// 线程安全的 Map
Map<String, Integer> map = new ConcurrentHashMap<>();
map.putIfAbsent("key", 1); // 不存在才插入
// 线程安全的 Queue
Queue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("a");
String s = queue.poll();
// 阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);
blockingQueue.offer("a", 1, TimeUnit.SECONDS); // 带超时
String item = blockingQueue.poll(1, TimeUnit.SECONDS); // 带超时
最佳实践
1. 优先使用线程池
// 不推荐
new Thread(() -> {
// 任务
}).start();
// 推荐
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.execute(() -> {
// 任务
});
2. 使用 volatile 保证可见性
class Flag {
private volatile boolean running = true;
public void stop() {
running = false;
}
public void run() {
while (running) {
// 处理任务
}
}
}
3. 避免死锁
// 错误的加锁顺序 - 可能导致死锁
class WrongOrder {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void method1() {
synchronized (lock1) {
synchronized (lock2) {
// 操作
}
}
}
public void method2() {
synchronized (lock2) { // 不同的顺序!
synchronized (lock1) {
// 操作
}
}
}
}
// 正确:始终按相同顺序加锁
class CorrectOrder {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void method1() {
synchronized (lock1) {
synchronized (lock2) {
// 操作
}
}
}
public void method2() {
synchronized (lock1) { // 相同的顺序
synchronized (lock2) {
// 操作
}
}
}
}
4. 使用 ThreadLocal 线程本地变量
class ThreadLocalExample {
private static final ThreadLocal<String> threadLocal =
ThreadLocal.withInitial(() -> "默认值");
public static void main(String[] args) {
// 主线程设置
threadLocal.set("主线程值");
System.out.println(threadLocal.get());
// 新线程
new Thread(() -> {
System.out.println(threadLocal.get()); // 默认值
threadLocal.set("线程2的值");
System.out.println(threadLocal.get());
}).start();
// 清理
threadLocal.remove();
}
}
虚拟线程(Java 21+)
虚拟线程(Virtual Threads)是 Java 21 引入的重要特性,它极大地简化了高并发应用的编写。
什么是虚拟线程?
虚拟线程是由 Java 运行时而非操作系统实现的轻量级线程。与传统平台线程相比,虚拟线程具有以下特点:
| 特性 | 平台线程 | 虚拟线程 |
|---|---|---|
| 实现 | 操作系统线程的包装 | JVM 实现 |
| 数量 | 受限于操作系统线程数 | 可达数百万 |
| 内存 | 栈空间较大(1MB+) | 栈空间小,按需增长 |
| 创建成本 | 高 | 低 |
| 阻塞成本 | 高(占用 OS 线程) | 低(释放载体线程) |
为什么需要虚拟线程?
传统平台线程是稀缺资源,每个线程都需要占用大量内存,且创建和销毁成本较高。这导致开发者不得不使用线程池、异步编程等复杂技术来管理线程。
虚拟线程解决了这个问题:你可以为每个任务创建一个虚拟线程,而不需要担心资源耗尽。
// 传统方式:需要线程池
ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 10000; i++) {
pool.submit(() -> processRequest()); // 只有100个并发
}
// 虚拟线程:每个任务一个线程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10000; i++) {
executor.submit(() -> processRequest()); // 可达10000个并发
}
}
创建虚拟线程
方式一:Thread.ofVirtual()
// 创建并启动虚拟线程
Thread vThread = Thread.ofVirtual().start(() -> {
System.out.println("虚拟线程运行中");
});
vThread.join(); // 等待线程结束
// 创建具名虚拟线程
Thread.Builder builder = Thread.ofVirtual().name("my-virtual-thread");
Thread thread = builder.start(() -> {
System.out.println("线程名: " + Thread.currentThread().getName());
});
thread.join();
方式二:Executors.newVirtualThreadPerTaskExecutor()
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 提交多个任务,每个任务都在新的虚拟线程中运行
Future<String> future1 = executor.submit(() -> fetchData("url1"));
Future<String> future2 = executor.submit(() -> fetchData("url2"));
Future<String> future3 = executor.submit(() -> fetchData("url3"));
// 获取结果
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
} // try-with-resources 自动等待所有任务完成
方式三:Thread.startVirtualThread()
Thread.startVirtualThread(() -> {
System.out.println("快速创建虚拟线程");
});
虚拟线程的工作原理
虚拟线程采用"用户态线程"模型:
- 载体线程(Carrier Thread):虚拟线程运行在载体线程(平台线程)上
- 挂载与卸载:当虚拟线程执行阻塞操作时,它会从载体线程上卸载,载体线程可以执行其他虚拟线程
- 调度:JVM 负责调度虚拟线程,而非操作系统
┌─────────────────────────────────────────┐
│ 虚拟线程 (百万级) │
│ VT1 VT2 VT3 VT4 ... VT1000000 │
└─────────────────┬───────────────────────┘
│ JVM 调度
▼
┌─────────────────────────────────────────┐
│ 载体线程 (平台线程) │
│ CT1 CT2 CT3 CT4 │
└─────────────────┬───────────────────────┘
│ OS 调度
▼
┌─────────────────────────────────────────┐
│ CPU 核心 │
│ Core1 Core2 Core3 Core4 │
└─────────────────────────────────────────┘
虚拟线程适用场景
适合使用虚拟线程:
- I/O 密集型任务(网络请求、文件操作、数据库查询)
- 每个请求一个线程的服务器应用
- 高并发但每个任务处理时间较短的场景
不适合使用虚拟线程:
- CPU 密集型任务(长时间计算)
- 长时间运行的任务
- 需要使用 synchronized 保护长时间阻塞操作的场景
实战示例:高并发 HTTP 客户端
import java.net.URI;
import java.net.http.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class VirtualThreadHttpClient {
public static void main(String[] args) throws Exception {
List<String> urls = List.of(
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3"
);
// 使用虚拟线程并发请求
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = urls.stream()
.map(url -> executor.submit(() -> fetchUrl(url)))
.collect(Collectors.toList());
for (Future<String> future : futures) {
System.out.println(future.get());
}
}
}
static String fetchUrl(String url) throws Exception {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build();
// 阻塞操作会自动让出载体线程
HttpResponse<String> response = client.send(
request,
HttpResponse.BodyHandlers.ofString()
);
return response.body();
}
}
虚拟线程与线程池
重要:不要池化虚拟线程!
// ❌ 错误:池化虚拟线程(浪费资源)
ExecutorService pool = Executors.newFixedThreadPool(100); // 平台线程池
// 虚拟线程已经足够轻量,不需要池化
// ✅ 正确:每个任务一个虚拟线程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(task1);
executor.submit(task2);
}
虚拟线程与信号量
当需要限制并发数量时,使用 Semaphore 而非线程池:
import java.util.concurrent.*;
public class RateLimitedVirtualThread {
// 限制最多 10 个并发
private static final Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
semaphore.acquire(); // 获取许可
try {
callExternalService();
} finally {
semaphore.release(); // 释放许可
}
});
}
}
}
static void callExternalService() throws InterruptedException {
Thread.sleep(100); // 模拟调用
}
}
虚拟线程的 Pinning 问题
虚拟线程在以下情况会被"钉住"(Pinned),无法卸载:
- 在
synchronized块或方法中执行阻塞操作 - 执行本地方法(Native Method)
解决方案:使用 ReentrantLock 代替 synchronized
// ❌ 可能导致 Pinning
class BadExample {
private final Object lock = new Object();
public void process() {
synchronized (lock) {
blockingIO(); // 阻塞时会钉住虚拟线程
}
}
}
// ✅ 避免Pinning
class GoodExample {
private final ReentrantLock lock = new ReentrantLock();
public void process() {
lock.lock();
try {
blockingIO(); // 可以正常卸载
} finally {
lock.unlock();
}
}
}
检测 Pinning:
# 启用JFR检测
java -XX:StartFlightRecording:settings=default ...
# 或使用系统属性
java -Djdk.tracePinnedThreads=full ...
ThreadLocal 与虚拟线程
由于虚拟线程数量可能非常大,使用 ThreadLocal 需要谨慎:
// ❌ 不推荐:每个虚拟线程都会创建一个实例
static final ThreadLocal<SimpleDateFormat> dateFormat =
ThreadLocal.withInitial(SimpleDateFormat::new);
// ✅ 推荐:使用不可变对象共享
static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
Structured Concurrency(结构化并发)
Java 21 引入了结构化并发 API(预览特性),简化多线程任务管理:
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
// 结构化并发示例
Response handleRequest() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并发执行多个子任务
Future<String> user = scope.fork(() -> fetchUser());
Future<List<Order>> orders = scope.fork(() -> fetchOrders());
// 等待所有任务完成(任一失败则取消其他)
scope.join();
scope.throwIfFailed();
// 组合结果
return new Response(user.resultNow(), orders.resultNow());
}
}
最佳实践总结
- 为每个任务创建虚拟线程:虚拟线程很便宜,不需要池化
- 使用同步阻塞风格:编写简单直观的代码,而非异步回调
- 避免 synchronized 长时间阻塞:使用 ReentrantLock 替代
- 谨慎使用 ThreadLocal:虚拟线程数量巨大,可能导致内存问题
- 使用 Semaphore 限制并发:不要用线程池来限制并发
- 适合 I/O 密集型任务:不适合 CPU 密集型任务
CompletableFuture(异步编程)
CompletableFuture 是 Java 8 引入的异步编程利器,它实现了 Future 和 CompletionStage 接口,提供了强大的异步任务组合能力。
什么是异步编程?
传统的同步编程中,一个任务完成后才能开始下一个任务。异步编程允许任务并行执行,提高程序吞吐量。
// 同步执行:三个任务顺序执行,总时间 = 任务1 + 任务2 + 任务3
String result1 = task1();
String result2 = task2();
String result3 = task3();
// 异步执行:三个任务并行执行,总时间 ≈ max(任务1, 任务2, 任务3)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> task1());
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> task2());
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> task3());
创建 CompletableFuture
方式一:supplyAsync(有返回值)
import java.util.concurrent.CompletableFuture;
// 使用默认线程池(FrokJoinPool.commonPool)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello World";
});
// 获取结果(阻塞等待)
String result = future.join();
System.out.println(result); // Hello World
方式二:runAsync(无返回值)
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("任务执行中: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务完成");
});
future.join(); // 等待完成
方式三:指定线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Result";
}, executor);
// 或者使用虚拟线程
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Result";
}, Executors.newVirtualThreadPerTaskExecutor());
任务链式组合
CompletableFuture 的强大之处在于可以轻松组合多个异步任务。
thenApply - 转换结果
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello") // 第一步:返回 String
.thenApply(s -> s + " World") // 第二步:转换为 "Hello World"
.thenApply(s -> s.toUpperCase()); // 第三步:转换为 "HELLO WORLD"
System.out.println(future.join()); // HELLO WORLD
thenAccept - 消费结果
CompletableFuture
.supplyAsync(() -> 100)
.thenAccept(money -> System.out.println("收到金额: " + money));
// 输出:收到金额: 100
thenRun - 执行后续动作
CompletableFuture
.supplyAsync(() -> 10)
.thenRun(() -> System.out.println("任务完成"));
// 输出:任务完成
并行执行多个任务
allOf - 等待所有任务完成
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Result1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(2000);
return "Result2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(500);
return "Result3";
});
// 等待所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
allOf.join(); // 阻塞等待所有任务完成
System.out.println(future1.join() + future2.join() + future3.join());
// 输出:Result1Result2Result3
anyOf - 任意一个任务完成
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(3000);
return "Result1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Result2";
});
// 任意一个完成就返回
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
System.out.println(anyOf.join()); // Result2(最先完成)
异常处理
exceptionally - 异常处理
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("任务失败");
}
return "Success";
})
.exceptionally(e -> {
System.out.println("异常: " + e.getMessage());
return "默认值";
});
System.out.println(future.join()); // 失败时输出:默认值
handle - 无论成功失败都处理
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return result + " World";
});
System.out.println(future.join()); // Hello World
whenComplete - 完成时回调
CompletableFuture
.supplyAsync(() -> 10)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("成功: " + result);
} else {
System.out.println("失败: " + ex.getMessage());
}
});
实战示例:并行获取用户信息
import java.util.concurrent.*;
public class UserService {
public UserInfo getUserInfo(long userId) throws Exception {
// 并行获取用户基本信息、订单、好友列表
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> fetchUser(userId), executor);
CompletableFuture<List<Order>> ordersFuture = CompletableFuture
.supplyAsync(() -> fetchOrders(userId), executor);
CompletableFuture<List<Friend>> friendsFuture = CompletableFuture
.supplyAsync(() -> fetchFriends(userId), executor);
// 等待所有任务完成
CompletableFuture.allOf(userFuture, ordersFuture, friendsFuture).join();
return new UserInfo(
userFuture.resultNow(),
ordersFuture.resultNow(),
friendsFuture.resultNow()
);
}
}
User fetchUser(long id) { /* ... */ return new User(); }
List<Order> fetchOrders(long id) { /* ... */ return List.of(); }
List<Friend> fetchFriends(long id) { /* ... */ return List.of(); }
}
record UserInfo(User user, List<Order> orders, List<Friend> friends) {}
record User() {}
record Order() {}
record Friend() {}
Fork/Join 框架
Fork/Join 框架是 Java 7 引入的并行计算框架,专门用于处理可以递归分解的任务。它采用"工作窃取"算法,能够高效利用多核处理器。
什么是工作窃取?
工作窃取(Work-Stealing)是一种并行计算算法:
- 每个工作线程有自己的任务队列
- 当一个线程完成自己的任务后,可以从其他线程的任务队列末尾"偷取"任务
- 这样可以保持所有线程都处于忙碌状态
ForkJoinPool
ForkJoinPool 是 Fork/Join 框架的核心执行器。
import java.util.concurrent.*;
ForkJoinPool pool = new ForkJoinPool();
// 使用公共线程池(推荐)
ForkJoinPool commonPool = ForkJoinPool.commonPool();
主要方法:
| 方法 | 说明 |
|---|---|
invoke(task) | 执行任务并等待结果 |
execute(task) | 异步执行任务 |
submit(task) | 提交任务,返回 Future |
RecursiveTask(有返回值的任务)
import java.util.concurrent.*;
import java.util.Arrays;
class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000;
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
// 任务足够小,直接计算
if (length <= THRESHOLD) {
return Arrays.stream(array, start, end).sum();
}
// 拆分任务
int middle = start + length / 2;
SumTask leftTask = new SumTask(array, start, middle);
SumTask rightTask = new SumTask(array, middle, end);
// 异步执行左半部分
leftTask.fork();
// 同步执行右半部分
Long rightResult = rightTask.compute();
// 等待左半部分结果
Long leftResult = leftTask.join();
// 合并结果
return leftResult + rightResult;
}
}
public class ForkJoinDemo {
public static void main(String[] args) {
long[] array = new long[10000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
ForkJoinPool pool = ForkJoinPool.commonPool();
SumTask task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = pool.invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("结果: " + result); // 50005000
System.out.println("耗时: " + (endTime - startTime) + "ms");
}
}
RecursiveAction(无返回值的任务)
import java.util.concurrent.*;
class PrintTask extends RecursiveAction {
private final int start;
private final int end;
private static final int THRESHOLD = 10;
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.println("Task: " + i);
}
} else {
int middle = start + (end - start) / 2;
PrintTask leftTask = new PrintTask(start, middle);
PrintTask rightTask = new PrintTask(middle, end);
leftTask.fork();
rightTask.fork();
leftTask.join();
rightTask.join();
}
}
}
public class RecursiveActionDemo {
public static void main(String[] args) {
ForkJoinPool pool = ForkJoinPool.commonPool();
PrintTask task = new PrintTask(0, 100);
pool.invoke(task);
}
}
CountedCompleter(带完成回调的任务)
import java.util.concurrent.*;
import java.util.*;
class MapTask extends RecursiveTask<Integer> {
private final List<Integer> data;
private final Map<String, Integer> resultMap;
private final String prefix;
public MapTask(List<Integer> data, Map<String, Integer> resultMap, String prefix) {
this.data = data;
this.resultMap = resultMap;
this.prefix = prefix;
}
@Override
protected Integer compute() {
if (data.size() < 10) {
int sum = 0;
for (Integer num : data) {
resultMap.put(prefix + num, num * num);
sum += num;
}
return sum;
}
int mid = data.size() / 2;
List<Integer> left = data.subList(0, mid);
List<Integer> right = data.subList(mid, data.size());
MapTask leftTask = new MapTask(left, resultMap, prefix + "L");
MapTask rightTask = new MapTask(right, resultMap, prefix + "R");
rightTask.fork();
int leftResult = leftTask.compute();
int rightResult = rightTask.join();
return leftResult + rightResult;
}
}
Fork/Join vs 传统线程池
| 特性 | Fork/Join | 传统线程池 |
|---|---|---|
| 适用场景 | 递归分解的任务 | 独立的任务 |
| 工作方式 | 工作窃取 | 任务队列 |
| 任务划分 | 自动拆分 | 手动划分 |
| 内存效率 | 高 | 中 |
使用建议
- 任务拆分要适当:拆分粒度太小会增加调度开销,太大则无法充分利用多核
- 避免阻塞:Fork/Join 适合 CPU 密集型任务,避免在任务中执行阻塞操作
- 使用公共池:除非有特殊需求,否则使用
ForkJoinPool.commonPool() - 合理设置并行度:
ForkJoinPool(int parallelism)设置并行度,通常等于 CPU 核心数
// 获取 CPU 核心数
int processors = Runtime.getRuntime().availableProcessors();
// 创建指定并行度的线程池
ForkJoinPool pool = new ForkJoinPool(processors);
小结
本章我们详细学习了 Java 多线程编程的核心内容:
- 线程基础:创建线程的多种方式、线程生命周期
- 线程控制:sleep、join、yield、interrupt
- 线程同步:synchronized、Lock
- 线程通信:wait/notify、Condition
- 线程池:Executor 框架、自定义线程池
- 并发工具类:CountDownLatch、CyclicBarrier、Semaphore
- 原子类:无锁线程安全操作
- 线程安全集合:ConcurrentHashMap 等
- 虚拟线程(Java 21+):轻量级线程,适合高并发 I/O 场景
- CompletableFuture:强大的异步编程和任务组合能力
- Fork/Join 框架:递归任务并行计算,工作窃取算法
练习
- 创建多个线程并打印 1-100 的数字
- 使用 synchronized 实现生产者-消费者
- 使用线程池处理大量任务
- 使用 CountDownLatch 等待多个任务完成
- 使用 ConcurrentHashMap 实现缓存
- 使用虚拟线程实现并发 HTTP 请求
- 对比虚拟线程和平台线程在 I/O 密集型任务中的性能差异