结构化并发(Structured Concurrency)
结构化并发是 Java 引入的一种并发编程范式,它将运行在不同线程中的相关任务组视为单一工作单元。这种方式简化了错误处理和取消操作,提高了可靠性,并增强了可观测性。结构化并发与虚拟线程配合使用,能够极大地简化并发程序的开发。
什么是结构化并发?
传统并发的问题
在传统的并发编程中,使用 ExecutorService 提交子任务时,任务与子任务之间的生命周期关系往往不够清晰:
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = executor.submit(() -> findUser());
Future<Integer> order = executor.submit(() -> fetchOrder());
String theUser = user.get(); // 等待 findUser
int theOrder = order.get(); // 等待 fetchOrder
return new Response(theUser, theOrder);
}
这段代码存在以下问题:
线程泄漏:如果 findUser() 抛出异常,handle() 会在调用 user.get() 时抛出异常,但 fetchOrder() 会继续在其自己的线程中运行。这是一个线程泄漏,轻则浪费资源,重则 fetchOrder() 线程会干扰其他任务。
取消传播问题:如果执行 handle() 的线程被中断,中断不会传播到子任务。两个子任务都会泄漏,即使 handle() 已经失败,它们仍会继续运行。
不必要的等待:如果 findUser() 执行很长时间,而 fetchOrder() 在此期间失败了,handle() 会不必要地等待 findUser() 完成。
这些问题的根源在于:程序在逻辑上有任务-子任务的结构关系,但这种关系只存在于开发者的脑海中,并没有在代码中得到体现。
结构化并发的解决方案
结构化并发源于一个简单的原则:如果一个任务拆分成多个并发子任务,那么这些子任务都应该返回到同一个地方——任务的代码块。
Response handle() throws InterruptedException {
try (var scope = StructuredTaskScope.open()) {
Subtask<String> user = scope.fork(() -> findUser());
Subtask<Integer> order = scope.fork(() -> fetchOrder());
scope.join(); // 等待所有子任务完成,自动传播异常
// 所有子任务都成功,组合结果
return new Response(user.get(), order.get());
}
}
这段代码保证了以下特性:
- 错误处理与短路:如果
findUser()或fetchOrder()任意一个失败,另一个会被自动取消(中断) - 取消传播:如果执行
handle()的线程在join()之前或期间被中断,两个子任务都会被自动取消 - 清晰性:代码结构清晰——设置子任务、等待完成或取消、处理结果
- 可观测性:线程转储会清晰显示任务层级关系
API 版本说明
结构化并发 API 在不同 Java 版本之间有重大变化:
| Java 版本 | 状态 | API 特点 |
|---|---|---|
| Java 19-20 | 孵化 | 初始 API |
| Java 21-24 | 预览 | 使用 ShutdownOnSuccess/ShutdownOnFailure 子类 |
| Java 25+ | 预览 | 使用 StructuredTaskScope.open() 和 Joiner 接口 |
本文档主要描述 Java 25 API,并在文末说明 Java 21-24 的差异。由于结构化并发仍是预览特性,需要启用预览标志:
# 编译
javac --release 25 --enable-preview Main.java
# 运行
java --enable-preview Main
StructuredTaskScope API(Java 25)
API 概述
StructuredTaskScope 是结构化并发 API 的核心类,位于 java.util.concurrent 包中。
public sealed interface StructuredTaskScope<T, R> extends AutoCloseable {
// 打开一个新的 scope(默认策略:所有成功或任一失败)
public static <T> StructuredTaskScope<T, Void> open();
// 使用自定义 Joiner 打开 scope
public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner);
// 派生子任务
public <U extends T> Subtask<U> fork(Callable<? extends U> task);
public Subtask<? extends T> fork(Runnable task);
// 等待所有子任务完成
public R join() throws InterruptedException;
// 关闭 scope
public void close();
}
类型参数说明:
T:在 scope 中执行的子任务的结果类型R:join()方法的返回类型
基本使用流程
// 1. 打开 scope
try (var scope = StructuredTaskScope.open()) {
// 2. 派生子任务
Subtask<String> task1 = scope.fork(() -> findUser());
Subtask<Integer> task2 = scope.fork(() -> fetchOrder());
// 3. 等待所有子任务完成
scope.join();
// 4. 处理结果
return new Response(task1.get(), task2.get());
} // 5. 自动关闭 scope(取消未完成的子任务)
关键点:
- 使用
try-with-resources确保 scope 被正确关闭 - 打开 scope 的线程称为"所有者线程"
- 只能在所有者线程中调用
fork()方法 - 必须在 scope 关闭前调用
join()
Subtask 接口
fork() 方法返回 Subtask 对象,用于查询子任务状态和获取结果:
public interface Subtask<T> extends Supplier<T> {
// 子任务状态
enum State { UNAVAILABLE, SUCCESS, FAILED }
State state(); // 获取状态
Callable<? extends T> task(); // 获取原始任务
T get(); // 获取结果(成功时)
Throwable exception(); // 获取异常(失败时)
}
状态说明:
UNAVAILABLE:尚未完成或已取消SUCCESS:成功完成FAILED:失败(抛出异常)
Joiner 接口
Joiner 接口用于定义子任务的完成策略。Java 25 提供了几个内置的 Joiner 工厂方法:
默认策略(无参数 open)
try (var scope = StructuredTaskScope.open()) {
scope.fork(() -> task1());
scope.fork(() -> task2());
scope.join(); // 所有成功才继续,任一失败则抛出 FailedException
}
默认策略:所有子任务都成功时 join() 返回 null,任一子任务失败则抛出 FailedException。
allSuccessfulOrThrow - 收集所有成功结果
<T> List<T> runConcurrently(Collection<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(Joiner.<T>allSuccessfulOrThrow())) {
tasks.forEach(scope::fork);
return scope.join().map(Subtask::get).toList();
}
}
所有子任务成功时,join() 返回一个包含所有子任务的 Stream,可以映射为结果列表。任一子任务失败则抛出 FailedException。
anySuccessfulResultOrThrow - 任一成功即返回
适用于竞速场景,从多个来源获取数据,任一成功即可:
<T> T race(Collection<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(Joiner.<T>anySuccessfulResultOrThrow())) {
tasks.forEach(scope::fork);
return scope.join(); // 返回第一个成功的结果
}
}
任一子任务成功时,scope 会被取消,其他未完成的子任务被中断,join() 返回该成功结果。如果所有子任务都失败,则抛出 FailedException。
awaitAll - 等待所有完成(无论成功失败)
try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
tasks.forEach(scope::fork);
scope.join(); // 等待所有子任务完成,不抛异常
}
适用于需要收集所有结果(包括失败)的场景。
awaitAllSuccessfulOrThrow - 等待所有成功
try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
tasks.forEach(scope::fork);
scope.join(); // 等待所有子任务成功完成
}
allUntil - 自定义谓词
try (var scope = StructuredTaskScope.open(
Joiner.allUntil(subtask -> subtask.state() == Subtask.State.FAILED))) {
tasks.forEach(scope::fork);
var completedTasks = scope.join().toList();
}
当所有子任务成功完成,或谓词对某个已完成的子任务返回 true 时,取消 scope 并返回所有子任务。
配置选项
设置线程工厂
ThreadFactory virtualThreadFactory = Thread.ofVirtual().name("worker-").factory();
try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow(),
cf -> cf.withThreadFactory(virtualThreadFactory))) {
scope.fork(() -> task1());
scope.fork(() -> task2());
return scope.join().map(Subtask::get).toList();
}
默认情况下,fork() 创建虚拟线程来执行子任务。
设置超时
import java.time.Duration;
<T> List<T> runWithTimeout(Collection<Callable<T>> tasks, Duration timeout)
throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<T>allSuccessfulOrThrow(),
cf -> cf.withTimeout(timeout))) {
tasks.forEach(scope::fork);
return scope.join().map(Subtask::get).toList();
// 如果超时,join() 抛出 TimeoutException
}
}
设置名称(用于监控)
try (var scope = StructuredTaskScope.open(
Joiner.awaitAll(),
cf -> cf.withName("my-scope"))) {
// ...
}
实战示例
并行获取多个服务数据
import java.util.concurrent.*;
import java.time.Duration;
public class ParallelFetchDemo {
public static void main(String[] args) throws InterruptedException {
var result = fetchUserData(1L);
System.out.println(result);
}
static UserData fetchUserData(Long userId) throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<Object>allSuccessfulOrThrow(),
cf -> cf.withTimeout(Duration.ofSeconds(5)))) {
var userInfo = scope.fork(() -> fetchUserInfo(userId));
var orders = scope.fork(() -> fetchOrders(userId));
var friends = scope.fork(() -> fetchFriends(userId));
scope.join();
return new UserData(
(String) userInfo.get(),
(String) orders.get(),
(String) friends.get()
);
}
}
static String fetchUserInfo(Long id) throws InterruptedException {
Thread.sleep(100); // 模拟网络请求
return "User-" + id;
}
static String fetchOrders(Long id) throws InterruptedException {
Thread.sleep(150);
return "Orders-" + id;
}
static String fetchFriends(Long id) throws InterruptedException {
Thread.sleep(80);
return "Friends-" + id;
}
record UserData(String userInfo, String orders, String friends) {}
}
竞速获取最快响应
import java.util.concurrent.*;
import java.util.List;
public class RacingDemo {
public static void main(String[] args) throws InterruptedException {
String result = fetchFromMirrors(List.of(
"mirror1.example.com",
"mirror2.example.com",
"mirror3.example.com"
));
System.out.println("获取结果: " + result);
}
static String fetchFromMirrors(List<String> mirrors) throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<String>anySuccessfulResultOrThrow())) {
mirrors.forEach(mirror ->
scope.fork(() -> fetchFromMirror(mirror)));
return scope.join();
}
}
static String fetchFromMirror(String mirror) throws InterruptedException {
int delay = (int) (Math.random() * 1000);
Thread.sleep(delay);
return "Data from " + mirror + " (delay: " + delay + "ms)";
}
}
自定义 Joiner - 收集成功结果忽略失败
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Stream;
// 自定义 Joiner:收集成功结果,忽略失败
class CollectingJoiner<T> implements Joiner<T, Stream<T>> {
private final ConcurrentLinkedQueue<T> results = new ConcurrentLinkedQueue<>();
@Override
public boolean onComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
}
return false; // 不取消 scope
}
@Override
public Stream<T> result() {
return results.stream();
}
}
// 使用自定义 Joiner
<T> List<T> collectSuccessful(List<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(new CollectingJoiner<T>())) {
tasks.forEach(scope::fork);
return scope.join().toList();
}
}
嵌套的结构化并发
import java.util.concurrent.*;
public class NestedStructuredConcurrency {
public static void main(String[] args) throws InterruptedException {
String result = processRequest("req-1");
System.out.println(result);
}
static String processRequest(String requestId) throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow())) {
var system1 = scope.fork(() -> processSystem1(requestId));
var system2 = scope.fork(() -> processSystem2(requestId));
scope.join();
return system1.get() + " + " + system2.get();
}
}
static String processSystem1(String requestId) throws InterruptedException {
// 子系统内部也使用结构化并发
try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow())) {
var db = scope.fork(() -> fetchFromDB(requestId));
var cache = scope.fork(() -> fetchFromCache(requestId));
scope.join();
return "System1[" + db.get() + ", " + cache.get() + "]";
}
}
static String processSystem2(String requestId) throws InterruptedException {
Thread.sleep(100);
return "System2[" + requestId + "]";
}
static String fetchFromDB(String id) throws InterruptedException {
Thread.sleep(200);
return "DB:" + id;
}
static String fetchFromCache(String id) throws InterruptedException {
Thread.sleep(50);
return "Cache:" + id;
}
}
异常处理
FailedException
当 scope 被认为失败时,join() 方法抛出 StructuredTaskScope.FailedException:
try (var scope = StructuredTaskScope.open()) {
scope.fork(() -> {
throw new IOException("Connection failed");
});
scope.fork(() -> "success");
scope.join(); // 抛出 FailedException
} catch (StructuredTaskScope.FailedException e) {
Throwable cause = e.getCause(); // 获取导致失败的原始异常
if (cause instanceof IOException ioe) {
System.out.println("IO 错误: " + ioe.getMessage());
}
}
TimeoutException
使用 withTimeout() 时,超时会导致 join() 抛出 TimeoutException:
try (var scope = StructuredTaskScope.open(
Joiner.awaitAll(),
cf -> cf.withTimeout(Duration.ofSeconds(1)))) {
scope.fork(() -> {
Thread.sleep(5000); // 耗时操作
return "result";
});
scope.join();
} catch (TimeoutException e) {
System.out.println("操作超时,子任务已被取消");
}
取消传播
结构化并发的一个重要特性是取消自动传播:
// 如果所有者线程被中断,所有子任务都会被取消
void handle() throws InterruptedException {
try (var scope = StructuredTaskScope.open()) {
scope.fork(() -> task1());
scope.fork(() -> task2());
// 如果当前线程在此处被中断,两个子任务都会被取消
scope.join();
}
}
子任务应该正确响应中断:
String task() {
try {
// 执行可能阻塞的操作
Thread.sleep(1000);
return "result";
} catch (InterruptedException e) {
// 恢复中断状态并快速返回
Thread.currentThread().interrupt();
throw new RuntimeException("Task cancelled", e);
}
}
与虚拟线程的配合
结构化并发与虚拟线程是天然的最佳搭档。默认情况下,fork() 方法使用虚拟线程执行子任务:
try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow())) {
// 派生大量虚拟线程执行子任务
for (int i = 0; i < 1000; i++) {
final int id = i;
scope.fork(() -> handleRequest(id));
}
scope.join();
}
虚拟线程提供了规模(可以创建数百万个线程),结构化并发提供了协调(管理这些线程的生命周期和依赖关系)。两者结合,让编写高吞吐量的并发应用变得简单。
可观测性
线程转储
使用 jcmd 可以生成包含结构化并发层级信息的线程转储:
jcmd <pid> Thread.dump_to_file -format=json threads.json
JSON 格式的线程转储会显示 StructuredTaskScope 如何将线程组织成层级结构,每个 scope 包含派生出的线程数组及其调用栈。这使得调试和监控变得更容易——可以清晰地看到一个任务的子任务在做什么。
结构化使用约束
StructuredTaskScope 强制执行结构化使用:
- 只能在所有者线程中调用
fork()方法 - 必须在 try-with-resources 块中使用
- 必须在 scope 关闭前调用
join() - 不能在 scope 关闭后继续使用
// 错误:在非所有者线程中 fork
try (var scope = StructuredTaskScope.open()) {
new Thread(() -> {
scope.fork(() -> "task"); // 抛出异常!
}).start();
}
// 错误:忘记调用 join()
try (var scope = StructuredTaskScope.open()) {
scope.fork(() -> "task");
// 忘记调用 scope.join()
} // close() 会等待子任务完成,但可能抛出异常
// 错误:scope 泄漏
var scope = StructuredTaskScope.open();
scope.fork(() -> "task");
// 忘记调用 scope.close()!
违反这些约束会抛出 StructureViolationException。
Java 21-24 API 说明
如果你使用的是 Java 21-24,API 有所不同。
Java 21-24 API 结构
// Java 21-24 使用子类而非 Joiner
public class StructuredTaskScope<T> implements AutoCloseable {
public StructuredTaskScope() { }
public StructuredTaskScope(String name, ThreadFactory factory) { }
public <U extends T> Subtask<U> fork(Callable<? extends U> task);
public StructuredTaskScope<T> join() throws InterruptedException;
public void close();
public void shutdown();
// 内置策略子类
public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> { }
public static final class ShutdownOnFailure extends StructuredTaskScope<Object> { }
}
Java 21 示例代码
// Java 21-24 风格:使用 ShutdownOnFailure
Response handle() throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());
scope.join(); // 等待所有子任务完成
scope.throwIfFailed(); // 如果有子任务失败,抛出异常
return new Response(user.resultNow(), order.resultNow());
}
}
// Java 21-24 风格:使用 ShutdownOnSuccess
String race(List<Callable<String>> tasks) throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
tasks.forEach(scope::fork);
scope.join();
return scope.result(); // 返回第一个成功的结果
}
}
迁移指南
从 Java 21-24 迁移到 Java 25+:
| Java 21-24 | Java 25+ |
|---|---|
new ShutdownOnFailure() | StructuredTaskScope.open() |
new ShutdownOnSuccess<T>() | StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow()) |
scope.throwIfFailed() | 自动抛出(join() 抛出 FailedException) |
scope.result() | scope.join() 直接返回结果 |
future.resultNow() | subtask.get() |
Future<T> 返回类型 | Subtask<T> 返回类型 |
小结
结构化并发是 Java 并发编程的重要改进:
- 结构清晰:任务-子任务的关系在代码结构中得到体现
- 简化错误处理:子任务失败时自动取消其他子任务
- 自动传播取消:外部取消会自动传播到所有子任务
- 增强可观测性:线程转储清晰显示任务层级
- 与虚拟线程完美配合:虚拟线程提供规模,结构化并发提供协调
结构化并发改变了我们思考并发程序的方式。它让我们回归到类似单线程代码的结构——子任务必须返回到创建它的地方,生命周期由语法块限定。这种模式使得并发程序更容易理解、调试和维护。