跳到主要内容

Java 多线程编程

Java 提供了强大的多线程支持,使得程序可以同时执行多个任务。本章将详细介绍 Java 多线程编程的核心概念和技术。

线程基础

什么是线程?

线程是程序执行的最小单元,一个进程可以包含多个线程,每个线程独立执行。

创建线程的方式

1. 继承 Thread 类

class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println("线程执行: " + i);
try {
sleep(100); // 休眠100毫秒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

// 使用
public class Main {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动线程

// 主线程继续执行
for (int i = 0; i < 5; i++) {
System.out.println("主线程: " + i);
}
}
}

2. 实现 Runnable 接口

class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println("Runnable 线程: " + i);
}
}
}

// 使用
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}

// 简化为 Lambda 表达式(Java 8+)
new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Lambda 线程: " + i);
}
}).start();

3. 实现 Callable 接口

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
}

// 使用
public class Main {
public static void main(String[] args) throws Exception {
Callable<Integer> callable = new MyCallable();
FutureTask<Integer> task = new FutureTask<>(callable);
Thread thread = new Thread(task);
thread.start();

// 获取返回值
Integer result = task.get();
System.out.println("计算结果: " + result);
}
}

线程生命周期

NEW(新建)→ RUNNABLE(就绪)→ RUNNING(运行)→ BLOCKED/WAITING(阻塞)→ TERMINATED(终止)
// 获取线程状态
Thread thread = new Thread(() -> System.out.println("运行中"));
System.out.println(thread.getState()); // NEW

thread.start();
System.out.println(thread.getState()); // RUNNABLE

thread.join();
System.out.println(thread.getState()); // TERMINATED

线程控制

sleep() 方法

使当前线程休眠一段时间:

try {
Thread.sleep(1000); // 休眠1秒
} catch (InterruptedException e) {
e.printStackTrace();
}

join() 方法

等待线程终止:

Thread t1 = new Thread(() -> {
System.out.println("线程1开始");
try { Thread.sleep(1000); } catch (Exception e) {}
System.out.println("线程1结束");
});

Thread t2 = new Thread(() -> {
System.out.println("线程2开始");
try { Thread.sleep(500); } catch (Exception e) {}
System.out.println("线程2结束");
});

t1.start();
t2.start();

t1.join(); // 等待t1结束
t2.join(); // 等待t2结束

System.out.println("所有线程执行完毕");

yield() 方法

暂停当前线程,让出 CPU 时间:

Thread.yield();  // 提示调度器可以让出CPU

interrupt() 方法

中断线程:

Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("运行中...");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// 收到中断信号,退出循环
break;
}
}
});

thread.start();
Thread.sleep(1000);
thread.interrupt(); // 中断线程

线程同步

synchronized 关键字

方法同步

class Counter {
private int count = 0;

// 同步方法
public synchronized void increment() {
count++;
}

public synchronized int getCount() {
return count;
}
}

代码块同步

class Counter {
private int count = 0;
private final Object lock = new Object();

public void increment() {
synchronized (lock) {
count++;
}
}

// 同步特定对象
public void incrementBy(Object obj) {
synchronized (obj) {
count++;
}
}
}

示例:生产者-消费者问题

import java.util.LinkedList;

class ProducerConsumer {
private LinkedList<Integer> buffer = new LinkedList<>();
private final int MAX_SIZE = 10;

public synchronized void produce(int value) throws InterruptedException {
while (buffer.size() >= MAX_SIZE) {
wait(); // 等待消费者消费
}
buffer.add(value);
System.out.println("生产: " + value);
notifyAll(); // 通知消费者
}

public synchronized int consume() throws InterruptedException {
while (buffer.isEmpty()) {
wait(); // 等待生产者生产
}
int value = buffer.removeFirst();
System.out.println("消费: " + value);
notifyAll(); // 通知生产者
return value;
}
}

Lock 接口

Java 5 引入了 Lock 接口,提供比 synchronized 更灵活的锁操作。

ReentrantLock

import java.util.concurrent.locks.ReentrantLock;

class Counter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();

public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock(); // 必须在 finally 中释放
}
}

public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}

ReentrantReadWriteLock

读写锁,允许多个读操作同时进行:

import java.util.concurrent.locks.ReentrantReadWriteLock;

class DataContainer {
private String data;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

public String read() {
rwLock.readLock().lock();
try {
Thread.sleep(100); // 模拟读取
return data;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
rwLock.readLock().unlock();
}
}

public void write(String data) {
rwLock.writeLock().lock();
try {
Thread.sleep(100); // 模拟写入
this.data = data;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
rwLock.writeLock().unlock();
}
}
}

线程通信

wait() / notify() / notifyAll()

class SharedResource {
private Object lock = new Object();
private boolean ready = false;

public void waitForReady() throws InterruptedException {
synchronized (lock) {
while (!ready) {
lock.wait(); // 等待
}
System.out.println("继续执行");
}
}

public void setReady() {
synchronized (lock) {
ready = true;
lock.notifyAll(); // 通知所有等待线程
}
}
}

Condition 接口

配合 Lock 使用:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BlockingQueue<T> {
private Object[] items;
private int count = 0;
private int takeIndex = 0;
private int putIndex = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

public BlockingQueue(int capacity) {
items = new Object[capacity];
}

public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // 等待不为空
}
@SuppressWarnings("unchecked")
T item = (T) items[takeIndex];
items[takeIndex] = null;
takeIndex = (takeIndex + 1) % items.length;
count--;
notFull.signal(); // 通知不满
return item;
} finally {
lock.unlock();
}
}

public void put(T item) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await(); // 等待不满
}
items[putIndex] = item;
putIndex = (putIndex + 1) % items.length;
count++;
notEmpty.signal(); // 通知不空
} finally {
lock.unlock();
}
}
}

线程池

Executor 框架

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

// 创建固定数量的线程池
ExecutorService executor = Executors.newFixedThreadPool(4);

// 提交任务
executor.execute(() -> System.out.println("任务1"));

executor.submit(() -> {
// 执行任务
System.out.println("任务2");
});

// 提交有返回值任务
Future<String> future = executor.submit(() -> "结果");
String result = future.get();

// 关闭线程池
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);

线程池类型

类型说明
newFixedThreadPool(n)固定数量的线程池
newCachedThreadPool()可缓存的线程池,自动调整
newSingleThreadExecutor()单线程的线程池
newScheduledThreadPool(n)定时调度的线程池

自定义线程池

import java.util.concurrent.*;

ExecutorService executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 任务队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

// 拒绝策略
// AbortPolicy - 抛出RejectedExecutionException
// CallerRunsPolicy - 调用者线程执行
// DiscardPolicy - 丢弃任务
// DiscardOldestPolicy - 丢弃最老的任务

并发工具类

CountDownLatch

等待一组线程完成:

import java.util.concurrent.CountDownLatch;

class Worker implements Runnable {
private final CountDownLatch latch;
private final int id;

public Worker(CountDownLatch latch, int id) {
this.latch = latch;
this.id = id;
}

@Override
public void run() {
System.out.println("Worker " + id + " 开始工作");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker " + id + " 完成");
latch.countDown(); // 计数减1
}
}

public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3); // 3个worker

for (int i = 0; i < 3; i++) {
new Thread(new Worker(latch, i)).start();
}

latch.await(); // 等待所有worker完成
System.out.println("所有工作完成");
}
}

CyclicBarrier

让一组线程相互等待:

import java.util.concurrent.CyclicBarrier;

class GamePlayer implements Runnable {
private final CyclicBarrier barrier;
private final int id;

public GamePlayer(CyclicBarrier barrier, int id) {
this.barrier = barrier;
this.id = id;
}

@Override
public void run() {
System.out.println("玩家 " + id + " 准备中...");
try {
barrier.await(); // 等待其他玩家
System.out.println("玩家 " + id + " 开始游戏");
} catch (Exception e) {
e.printStackTrace();
}
}
}

public class Main {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () ->
System.out.println("游戏开始!")
);

for (int i = 0; i < 3; i++) {
new Thread(new GamePlayer(barrier, i)).start();
}
}
}

Semaphore

控制并发数量:

import java.util.concurrent.Semaphore;

class Pool {
private final Semaphore semaphore;
private final boolean[] connections;

public Pool(int size) {
semaphore = new Semaphore(size);
connections = new boolean[size];
}

public void connect() throws InterruptedException {
semaphore.acquire();

int id = -1;
synchronized (this) {
for (int i = 0; i < connections.length; i++) {
if (!connections[i]) {
connections[i] = true;
id = i;
break;
}
}
}

System.out.println("获取连接: " + id);
// 使用连接
Thread.sleep((long) (Math.random() * 1000));

synchronized (this) {
connections[id] = false;
}
semaphore.release();
System.out.println("释放连接: " + id);
}
}

Exchanger

两个线程交换数据:

import java.util.concurrent.Exchanger;

class Producer implements Runnable {
private final Exchanger<String> exchanger;
private String data = "产品A";

public Producer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}

@Override
public void run() {
try {
for (int i = 0; i < 3; i++) {
System.out.println("生产者发送: " + data);
data = exchanger.exchange(data);
System.out.println("生产者收到: " + data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Consumer implements Runnable {
private final Exchanger<String> exchanger;
private String data = "产品B";

public Consumer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}

@Override
public void run() {
try {
for (int i = 0; i < 3; i++) {
System.out.println("消费者发送: " + data);
data = exchanger.exchange(data);
System.out.println("消费者收到: " + data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

原子类

java.util.concurrent.atomic 包提供了无锁的线程安全操作:

import java.util.concurrent.atomic.*;

// 原子整数
AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet(); // count++ 返回新值
count.getAndIncrement(); // 返回旧值然后++
count.addAndGet(5); // 加5
count.compareAndSet(0, 1); // CAS操作

// 原子长整型
AtomicLong atomicLong = new AtomicLong();

// 原子引用
AtomicReference<Person> atomicRef = new AtomicReference<>(new Person());
atomicRef.compareAndSet(person1, person2);
atomicRef.updateAndGet(p -> new Person(p.getName(), p.getAge() + 1));

// 原子数组
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.getAndSet(0, 5);

// 原子字段更新器
AtomicIntegerFieldUpdater<Person> updater =
AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
updater.incrementAndGet(person);

线程安全集合

import java.util.concurrent.*;

// 线程安全的 List
List<String> list = new CopyOnWriteArrayList<>();
list.add("a");
list.remove("a");

// 线程安全的 Set
Set<String> set = new CopyOnWriteArraySet<>();

// 线程安全的 Map
Map<String, Integer> map = new ConcurrentHashMap<>();
map.putIfAbsent("key", 1); // 不存在才插入

// 线程安全的 Queue
Queue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("a");
String s = queue.poll();

// 阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);
blockingQueue.offer("a", 1, TimeUnit.SECONDS); // 带超时
String item = blockingQueue.poll(1, TimeUnit.SECONDS); // 带超时

最佳实践

1. 优先使用线程池

// 不推荐
new Thread(() -> {
// 任务
}).start();

// 推荐
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.execute(() -> {
// 任务
});

2. 使用 volatile 保证可见性

class Flag {
private volatile boolean running = true;

public void stop() {
running = false;
}

public void run() {
while (running) {
// 处理任务
}
}
}

3. 避免死锁

// 错误的加锁顺序 - 可能导致死锁
class WrongOrder {
private final Object lock1 = new Object();
private final Object lock2 = new Object();

public void method1() {
synchronized (lock1) {
synchronized (lock2) {
// 操作
}
}
}

public void method2() {
synchronized (lock2) { // 不同的顺序!
synchronized (lock1) {
// 操作
}
}
}
}

// 正确:始终按相同顺序加锁
class CorrectOrder {
private final Object lock1 = new Object();
private final Object lock2 = new Object();

public void method1() {
synchronized (lock1) {
synchronized (lock2) {
// 操作
}
}
}

public void method2() {
synchronized (lock1) { // 相同的顺序
synchronized (lock2) {
// 操作
}
}
}
}

4. 使用 ThreadLocal 线程本地变量

class ThreadLocalExample {
private static final ThreadLocal<String> threadLocal =
ThreadLocal.withInitial(() -> "默认值");

public static void main(String[] args) {
// 主线程设置
threadLocal.set("主线程值");
System.out.println(threadLocal.get());

// 新线程
new Thread(() -> {
System.out.println(threadLocal.get()); // 默认值
threadLocal.set("线程2的值");
System.out.println(threadLocal.get());
}).start();

// 清理
threadLocal.remove();
}
}

虚拟线程(Java 21+)

虚拟线程(Virtual Threads)是 Java 21 引入的重要特性,它极大地简化了高并发应用的编写。

什么是虚拟线程?

虚拟线程是由 Java 运行时而非操作系统实现的轻量级线程。与传统平台线程相比,虚拟线程具有以下特点:

特性平台线程虚拟线程
实现操作系统线程的包装JVM 实现
数量受限于操作系统线程数可达数百万
内存栈空间较大(1MB+)栈空间小,按需增长
创建成本
阻塞成本高(占用 OS 线程)低(释放载体线程)

为什么需要虚拟线程?

传统平台线程是稀缺资源,每个线程都需要占用大量内存,且创建和销毁成本较高。这导致开发者不得不使用线程池、异步编程等复杂技术来管理线程。

虚拟线程解决了这个问题:你可以为每个任务创建一个虚拟线程,而不需要担心资源耗尽。

// 传统方式:需要线程池
ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 10000; i++) {
pool.submit(() -> processRequest()); // 只有100个并发
}

// 虚拟线程:每个任务一个线程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10000; i++) {
executor.submit(() -> processRequest()); // 可达10000个并发
}
}

创建虚拟线程

方式一:Thread.ofVirtual()

// 创建并启动虚拟线程
Thread vThread = Thread.ofVirtual().start(() -> {
System.out.println("虚拟线程运行中");
});

vThread.join(); // 等待线程结束

// 创建具名虚拟线程
Thread.Builder builder = Thread.ofVirtual().name("my-virtual-thread");
Thread thread = builder.start(() -> {
System.out.println("线程名: " + Thread.currentThread().getName());
});
thread.join();

方式二:Executors.newVirtualThreadPerTaskExecutor()

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 提交多个任务,每个任务都在新的虚拟线程中运行
Future<String> future1 = executor.submit(() -> fetchData("url1"));
Future<String> future2 = executor.submit(() -> fetchData("url2"));
Future<String> future3 = executor.submit(() -> fetchData("url3"));

// 获取结果
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
} // try-with-resources 自动等待所有任务完成

方式三:Thread.startVirtualThread()

Thread.startVirtualThread(() -> {
System.out.println("快速创建虚拟线程");
});

虚拟线程的工作原理

虚拟线程采用"用户态线程"模型:

  1. 载体线程(Carrier Thread):虚拟线程运行在载体线程(平台线程)上
  2. 挂载与卸载:当虚拟线程执行阻塞操作时,它会从载体线程上卸载,载体线程可以执行其他虚拟线程
  3. 调度:JVM 负责调度虚拟线程,而非操作系统
┌─────────────────────────────────────────┐
│ 虚拟线程 (百万级) │
│ VT1 VT2 VT3 VT4 ... VT1000000 │
└─────────────────┬───────────────────────┘
│ JVM 调度

┌─────────────────────────────────────────┐
│ 载体线程 (平台线程) │
│ CT1 CT2 CT3 CT4 │
└─────────────────┬───────────────────────┘
│ OS 调度

┌─────────────────────────────────────────┐
│ CPU 核心 │
│ Core1 Core2 Core3 Core4 │
└─────────────────────────────────────────┘

虚拟线程适用场景

适合使用虚拟线程:

  • I/O 密集型任务(网络请求、文件操作、数据库查询)
  • 每个请求一个线程的服务器应用
  • 高并发但每个任务处理时间较短的场景

不适合使用虚拟线程:

  • CPU 密集型任务(长时间计算)
  • 长时间运行的任务
  • 需要使用 synchronized 保护长时间阻塞操作的场景

实战示例:高并发 HTTP 客户端

import java.net.URI;
import java.net.http.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class VirtualThreadHttpClient {

public static void main(String[] args) throws Exception {
List<String> urls = List.of(
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3"
);

// 使用虚拟线程并发请求
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = urls.stream()
.map(url -> executor.submit(() -> fetchUrl(url)))
.collect(Collectors.toList());

for (Future<String> future : futures) {
System.out.println(future.get());
}
}
}

static String fetchUrl(String url) throws Exception {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build();

// 阻塞操作会自动让出载体线程
HttpResponse<String> response = client.send(
request,
HttpResponse.BodyHandlers.ofString()
);

return response.body();
}
}

虚拟线程与线程池

重要:不要池化虚拟线程!

// ❌ 错误:池化虚拟线程(浪费资源)
ExecutorService pool = Executors.newFixedThreadPool(100); // 平台线程池
// 虚拟线程已经足够轻量,不需要池化

// ✅ 正确:每个任务一个虚拟线程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(task1);
executor.submit(task2);
}

虚拟线程与信号量

当需要限制并发数量时,使用 Semaphore 而非线程池:

import java.util.concurrent.*;

public class RateLimitedVirtualThread {
// 限制最多 10 个并发
private static final Semaphore semaphore = new Semaphore(10);

public static void main(String[] args) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
semaphore.acquire(); // 获取许可
try {
callExternalService();
} finally {
semaphore.release(); // 释放许可
}
});
}
}
}

static void callExternalService() throws InterruptedException {
Thread.sleep(100); // 模拟调用
}
}

虚拟线程的 Pinning 问题

虚拟线程在以下情况会被"钉住"(Pinned),无法卸载:

  1. synchronized 块或方法中执行阻塞操作
  2. 执行本地方法(Native Method)

解决方案:使用 ReentrantLock 代替 synchronized

// ❌ 可能导致 Pinning
class BadExample {
private final Object lock = new Object();

public void process() {
synchronized (lock) {
blockingIO(); // 阻塞时会钉住虚拟线程
}
}
}

// ✅ 避免Pinning
class GoodExample {
private final ReentrantLock lock = new ReentrantLock();

public void process() {
lock.lock();
try {
blockingIO(); // 可以正常卸载
} finally {
lock.unlock();
}
}
}

检测 Pinning:

# 启用JFR检测
java -XX:StartFlightRecording:settings=default ...

# 或使用系统属性
java -Djdk.tracePinnedThreads=full ...

ThreadLocal 与虚拟线程

由于虚拟线程数量可能非常大,使用 ThreadLocal 需要谨慎:

// ❌ 不推荐:每个虚拟线程都会创建一个实例
static final ThreadLocal<SimpleDateFormat> dateFormat =
ThreadLocal.withInitial(SimpleDateFormat::new);

// ✅ 推荐:使用不可变对象共享
static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");

Structured Concurrency(结构化并发)

Java 21 引入了结构化并发 API(预览特性),简化多线程任务管理:

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

// 结构化并发示例
Response handleRequest() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并发执行多个子任务
Future<String> user = scope.fork(() -> fetchUser());
Future<List<Order>> orders = scope.fork(() -> fetchOrders());

// 等待所有任务完成(任一失败则取消其他)
scope.join();
scope.throwIfFailed();

// 组合结果
return new Response(user.resultNow(), orders.resultNow());
}
}

最佳实践总结

  1. 为每个任务创建虚拟线程:虚拟线程很便宜,不需要池化
  2. 使用同步阻塞风格:编写简单直观的代码,而非异步回调
  3. 避免 synchronized 长时间阻塞:使用 ReentrantLock 替代
  4. 谨慎使用 ThreadLocal:虚拟线程数量巨大,可能导致内存问题
  5. 使用 Semaphore 限制并发:不要用线程池来限制并发
  6. 适合 I/O 密集型任务:不适合 CPU 密集型任务

CompletableFuture(异步编程)

CompletableFuture 是 Java 8 引入的异步编程利器,它实现了 Future 和 CompletionStage 接口,提供了强大的异步任务组合能力。

什么是异步编程?

传统的同步编程中,一个任务完成后才能开始下一个任务。异步编程允许任务并行执行,提高程序吞吐量。

// 同步执行:三个任务顺序执行,总时间 = 任务1 + 任务2 + 任务3
String result1 = task1();
String result2 = task2();
String result3 = task3();

// 异步执行:三个任务并行执行,总时间 ≈ max(任务1, 任务2, 任务3)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> task1());
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> task2());
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> task3());

创建 CompletableFuture

方式一:supplyAsync(有返回值)

import java.util.concurrent.CompletableFuture;

// 使用默认线程池(FrokJoinPool.commonPool)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello World";
});

// 获取结果(阻塞等待)
String result = future.join();
System.out.println(result); // Hello World

方式二:runAsync(无返回值)

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("任务执行中: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务完成");
});

future.join(); // 等待完成

方式三:指定线程池

ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Result";
}, executor);

// 或者使用虚拟线程
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Result";
}, Executors.newVirtualThreadPerTaskExecutor());

任务链式组合

CompletableFuture 的强大之处在于可以轻松组合多个异步任务。

thenApply - 转换结果

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello") // 第一步:返回 String
.thenApply(s -> s + " World") // 第二步:转换为 "Hello World"
.thenApply(s -> s.toUpperCase()); // 第三步:转换为 "HELLO WORLD"

System.out.println(future.join()); // HELLO WORLD

thenAccept - 消费结果

CompletableFuture
.supplyAsync(() -> 100)
.thenAccept(money -> System.out.println("收到金额: " + money));
// 输出:收到金额: 100

thenRun - 执行后续动作

CompletableFuture
.supplyAsync(() -> 10)
.thenRun(() -> System.out.println("任务完成"));
// 输出:任务完成

并行执行多个任务

allOf - 等待所有任务完成

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Result1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(2000);
return "Result2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(500);
return "Result3";
});

// 等待所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);

allOf.join(); // 阻塞等待所有任务完成

System.out.println(future1.join() + future2.join() + future3.join());
// 输出:Result1Result2Result3

anyOf - 任意一个任务完成

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(3000);
return "Result1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Result2";
});

// 任意一个完成就返回
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);

System.out.println(anyOf.join()); // Result2(最先完成)

异常处理

exceptionally - 异常处理

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("任务失败");
}
return "Success";
})
.exceptionally(e -> {
System.out.println("异常: " + e.getMessage());
return "默认值";
});

System.out.println(future.join()); // 失败时输出:默认值

handle - 无论成功失败都处理

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return result + " World";
});

System.out.println(future.join()); // Hello World

whenComplete - 完成时回调

CompletableFuture
.supplyAsync(() -> 10)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("成功: " + result);
} else {
System.out.println("失败: " + ex.getMessage());
}
});

实战示例:并行获取用户信息

import java.util.concurrent.*;

public class UserService {

public UserInfo getUserInfo(long userId) throws Exception {
// 并行获取用户基本信息、订单、好友列表
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> fetchUser(userId), executor);

CompletableFuture<List<Order>> ordersFuture = CompletableFuture
.supplyAsync(() -> fetchOrders(userId), executor);

CompletableFuture<List<Friend>> friendsFuture = CompletableFuture
.supplyAsync(() -> fetchFriends(userId), executor);

// 等待所有任务完成
CompletableFuture.allOf(userFuture, ordersFuture, friendsFuture).join();

return new UserInfo(
userFuture.resultNow(),
ordersFuture.resultNow(),
friendsFuture.resultNow()
);
}
}

User fetchUser(long id) { /* ... */ return new User(); }
List<Order> fetchOrders(long id) { /* ... */ return List.of(); }
List<Friend> fetchFriends(long id) { /* ... */ return List.of(); }
}

record UserInfo(User user, List<Order> orders, List<Friend> friends) {}
record User() {}
record Order() {}
record Friend() {}

Fork/Join 框架

Fork/Join 框架是 Java 7 引入的并行计算框架,专门用于处理可以递归分解的任务。它采用"工作窃取"算法,能够高效利用多核处理器。

什么是工作窃取?

工作窃取(Work-Stealing)是一种并行计算算法:

  • 每个工作线程有自己的任务队列
  • 当一个线程完成自己的任务后,可以从其他线程的任务队列末尾"偷取"任务
  • 这样可以保持所有线程都处于忙碌状态

ForkJoinPool

ForkJoinPool 是 Fork/Join 框架的核心执行器。

import java.util.concurrent.*;

ForkJoinPool pool = new ForkJoinPool();

// 使用公共线程池(推荐)
ForkJoinPool commonPool = ForkJoinPool.commonPool();

主要方法:

方法说明
invoke(task)执行任务并等待结果
execute(task)异步执行任务
submit(task)提交任务,返回 Future

RecursiveTask(有返回值的任务)

import java.util.concurrent.*;
import java.util.Arrays;

class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000;

public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
int length = end - start;

// 任务足够小,直接计算
if (length <= THRESHOLD) {
return Arrays.stream(array, start, end).sum();
}

// 拆分任务
int middle = start + length / 2;
SumTask leftTask = new SumTask(array, start, middle);
SumTask rightTask = new SumTask(array, middle, end);

// 异步执行左半部分
leftTask.fork();

// 同步执行右半部分
Long rightResult = rightTask.compute();

// 等待左半部分结果
Long leftResult = leftTask.join();

// 合并结果
return leftResult + rightResult;
}
}

public class ForkJoinDemo {
public static void main(String[] args) {
long[] array = new long[10000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}

ForkJoinPool pool = ForkJoinPool.commonPool();
SumTask task = new SumTask(array, 0, array.length);

long startTime = System.currentTimeMillis();
Long result = pool.invoke(task);
long endTime = System.currentTimeMillis();

System.out.println("结果: " + result); // 50005000
System.out.println("耗时: " + (endTime - startTime) + "ms");
}
}

RecursiveAction(无返回值的任务)

import java.util.concurrent.*;

class PrintTask extends RecursiveAction {
private final int start;
private final int end;
private static final int THRESHOLD = 10;

public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.println("Task: " + i);
}
} else {
int middle = start + (end - start) / 2;
PrintTask leftTask = new PrintTask(start, middle);
PrintTask rightTask = new PrintTask(middle, end);

leftTask.fork();
rightTask.fork();

leftTask.join();
rightTask.join();
}
}
}

public class RecursiveActionDemo {
public static void main(String[] args) {
ForkJoinPool pool = ForkJoinPool.commonPool();
PrintTask task = new PrintTask(0, 100);
pool.invoke(task);
}
}

CountedCompleter(带完成回调的任务)

import java.util.concurrent.*;
import java.util.*;

class MapTask extends RecursiveTask<Integer> {
private final List<Integer> data;
private final Map<String, Integer> resultMap;
private final String prefix;

public MapTask(List<Integer> data, Map<String, Integer> resultMap, String prefix) {
this.data = data;
this.resultMap = resultMap;
this.prefix = prefix;
}

@Override
protected Integer compute() {
if (data.size() < 10) {
int sum = 0;
for (Integer num : data) {
resultMap.put(prefix + num, num * num);
sum += num;
}
return sum;
}

int mid = data.size() / 2;
List<Integer> left = data.subList(0, mid);
List<Integer> right = data.subList(mid, data.size());

MapTask leftTask = new MapTask(left, resultMap, prefix + "L");
MapTask rightTask = new MapTask(right, resultMap, prefix + "R");

rightTask.fork();
int leftResult = leftTask.compute();
int rightResult = rightTask.join();

return leftResult + rightResult;
}
}

Fork/Join vs 传统线程池

特性Fork/Join传统线程池
适用场景递归分解的任务独立的任务
工作方式工作窃取任务队列
任务划分自动拆分手动划分
内存效率

使用建议

  1. 任务拆分要适当:拆分粒度太小会增加调度开销,太大则无法充分利用多核
  2. 避免阻塞:Fork/Join 适合 CPU 密集型任务,避免在任务中执行阻塞操作
  3. 使用公共池:除非有特殊需求,否则使用 ForkJoinPool.commonPool()
  4. 合理设置并行度ForkJoinPool(int parallelism) 设置并行度,通常等于 CPU 核心数
// 获取 CPU 核心数
int processors = Runtime.getRuntime().availableProcessors();

// 创建指定并行度的线程池
ForkJoinPool pool = new ForkJoinPool(processors);

小结

本章我们详细学习了 Java 多线程编程的核心内容:

  1. 线程基础:创建线程的多种方式、线程生命周期
  2. 线程控制:sleep、join、yield、interrupt
  3. 线程同步:synchronized、Lock
  4. 线程通信:wait/notify、Condition
  5. 线程池:Executor 框架、自定义线程池
  6. 并发工具类:CountDownLatch、CyclicBarrier、Semaphore
  7. 原子类:无锁线程安全操作
  8. 线程安全集合:ConcurrentHashMap 等
  9. 虚拟线程(Java 21+):轻量级线程,适合高并发 I/O 场景
  10. CompletableFuture:强大的异步编程和任务组合能力
  11. Fork/Join 框架:递归任务并行计算,工作窃取算法

练习

  1. 创建多个线程并打印 1-100 的数字
  2. 使用 synchronized 实现生产者-消费者
  3. 使用线程池处理大量任务
  4. 使用 CountDownLatch 等待多个任务完成
  5. 使用 ConcurrentHashMap 实现缓存
  6. 使用虚拟线程实现并发 HTTP 请求
  7. 对比虚拟线程和平台线程在 I/O 密集型任务中的性能差异