跳到主要内容

结构化并发(Structured Concurrency)

结构化并发是 Java 引入的一种并发编程范式,它将运行在不同线程中的相关任务组视为单一工作单元。这种方式简化了错误处理和取消操作,提高了可靠性,并增强了可观测性。结构化并发与虚拟线程配合使用,能够极大地简化并发程序的开发。

什么是结构化并发?

传统并发的问题

在传统的并发编程中,使用 ExecutorService 提交子任务时,任务与子任务之间的生命周期关系往往不够清晰:

Response handle() throws ExecutionException, InterruptedException {
Future<String> user = executor.submit(() -> findUser());
Future<Integer> order = executor.submit(() -> fetchOrder());
String theUser = user.get(); // 等待 findUser
int theOrder = order.get(); // 等待 fetchOrder
return new Response(theUser, theOrder);
}

这段代码存在以下问题:

线程泄漏:如果 findUser() 抛出异常,handle() 会在调用 user.get() 时抛出异常,但 fetchOrder() 会继续在其自己的线程中运行。这是一个线程泄漏,轻则浪费资源,重则 fetchOrder() 线程会干扰其他任务。

取消传播问题:如果执行 handle() 的线程被中断,中断不会传播到子任务。两个子任务都会泄漏,即使 handle() 已经失败,它们仍会继续运行。

不必要的等待:如果 findUser() 执行很长时间,而 fetchOrder() 在此期间失败了,handle() 会不必要地等待 findUser() 完成。

这些问题的根源在于:程序在逻辑上有任务-子任务的结构关系,但这种关系只存在于开发者的脑海中,并没有在代码中得到体现。

结构化并发的解决方案

结构化并发源于一个简单的原则:如果一个任务拆分成多个并发子任务,那么这些子任务都应该返回到同一个地方——任务的代码块。

Response handle() throws InterruptedException {
try (var scope = StructuredTaskScope.open()) {
Subtask<String> user = scope.fork(() -> findUser());
Subtask<Integer> order = scope.fork(() -> fetchOrder());
scope.join(); // 等待所有子任务完成,自动传播异常
// 所有子任务都成功,组合结果
return new Response(user.get(), order.get());
}
}

这段代码保证了以下特性:

  • 错误处理与短路:如果 findUser()fetchOrder() 任意一个失败,另一个会被自动取消(中断)
  • 取消传播:如果执行 handle() 的线程在 join() 之前或期间被中断,两个子任务都会被自动取消
  • 清晰性:代码结构清晰——设置子任务、等待完成或取消、处理结果
  • 可观测性:线程转储会清晰显示任务层级关系

API 版本说明

结构化并发 API 在不同 Java 版本之间有重大变化:

Java 版本状态API 特点
Java 19-20孵化初始 API
Java 21-24预览使用 ShutdownOnSuccess/ShutdownOnFailure 子类
Java 25+预览使用 StructuredTaskScope.open()Joiner 接口

本文档主要描述 Java 25 API,并在文末说明 Java 21-24 的差异。由于结构化并发仍是预览特性,需要启用预览标志:

# 编译
javac --release 25 --enable-preview Main.java

# 运行
java --enable-preview Main

StructuredTaskScope API(Java 25)

API 概述

StructuredTaskScope 是结构化并发 API 的核心类,位于 java.util.concurrent 包中。

public sealed interface StructuredTaskScope<T, R> extends AutoCloseable {
// 打开一个新的 scope(默认策略:所有成功或任一失败)
public static <T> StructuredTaskScope<T, Void> open();

// 使用自定义 Joiner 打开 scope
public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner);

// 派生子任务
public <U extends T> Subtask<U> fork(Callable<? extends U> task);
public Subtask<? extends T> fork(Runnable task);

// 等待所有子任务完成
public R join() throws InterruptedException;

// 关闭 scope
public void close();
}

类型参数说明:

  • T:在 scope 中执行的子任务的结果类型
  • Rjoin() 方法的返回类型

基本使用流程

// 1. 打开 scope
try (var scope = StructuredTaskScope.open()) {

// 2. 派生子任务
Subtask<String> task1 = scope.fork(() -> findUser());
Subtask<Integer> task2 = scope.fork(() -> fetchOrder());

// 3. 等待所有子任务完成
scope.join();

// 4. 处理结果
return new Response(task1.get(), task2.get());

} // 5. 自动关闭 scope(取消未完成的子任务)

关键点:

  • 使用 try-with-resources 确保 scope 被正确关闭
  • 打开 scope 的线程称为"所有者线程"
  • 只能在所有者线程中调用 fork() 方法
  • 必须在 scope 关闭前调用 join()

Subtask 接口

fork() 方法返回 Subtask 对象,用于查询子任务状态和获取结果:

public interface Subtask<T> extends Supplier<T> {
// 子任务状态
enum State { UNAVAILABLE, SUCCESS, FAILED }

State state(); // 获取状态
Callable<? extends T> task(); // 获取原始任务
T get(); // 获取结果(成功时)
Throwable exception(); // 获取异常(失败时)
}

状态说明:

  • UNAVAILABLE:尚未完成或已取消
  • SUCCESS:成功完成
  • FAILED:失败(抛出异常)

Joiner 接口

Joiner 接口用于定义子任务的完成策略。Java 25 提供了几个内置的 Joiner 工厂方法:

默认策略(无参数 open)

try (var scope = StructuredTaskScope.open()) {
scope.fork(() -> task1());
scope.fork(() -> task2());
scope.join(); // 所有成功才继续,任一失败则抛出 FailedException
}

默认策略:所有子任务都成功时 join() 返回 null,任一子任务失败则抛出 FailedException

allSuccessfulOrThrow - 收集所有成功结果

<T> List<T> runConcurrently(Collection<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(Joiner.<T>allSuccessfulOrThrow())) {
tasks.forEach(scope::fork);
return scope.join().map(Subtask::get).toList();
}
}

所有子任务成功时,join() 返回一个包含所有子任务的 Stream,可以映射为结果列表。任一子任务失败则抛出 FailedException

anySuccessfulResultOrThrow - 任一成功即返回

适用于竞速场景,从多个来源获取数据,任一成功即可:

<T> T race(Collection<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(Joiner.<T>anySuccessfulResultOrThrow())) {
tasks.forEach(scope::fork);
return scope.join(); // 返回第一个成功的结果
}
}

任一子任务成功时,scope 会被取消,其他未完成的子任务被中断,join() 返回该成功结果。如果所有子任务都失败,则抛出 FailedException

awaitAll - 等待所有完成(无论成功失败)

try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
tasks.forEach(scope::fork);
scope.join(); // 等待所有子任务完成,不抛异常
}

适用于需要收集所有结果(包括失败)的场景。

awaitAllSuccessfulOrThrow - 等待所有成功

try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
tasks.forEach(scope::fork);
scope.join(); // 等待所有子任务成功完成
}

allUntil - 自定义谓词

try (var scope = StructuredTaskScope.open(
Joiner.allUntil(subtask -> subtask.state() == Subtask.State.FAILED))) {
tasks.forEach(scope::fork);
var completedTasks = scope.join().toList();
}

当所有子任务成功完成,或谓词对某个已完成的子任务返回 true 时,取消 scope 并返回所有子任务。

配置选项

设置线程工厂

ThreadFactory virtualThreadFactory = Thread.ofVirtual().name("worker-").factory();

try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow(),
cf -> cf.withThreadFactory(virtualThreadFactory))) {

scope.fork(() -> task1());
scope.fork(() -> task2());

return scope.join().map(Subtask::get).toList();
}

默认情况下,fork() 创建虚拟线程来执行子任务。

设置超时

import java.time.Duration;

<T> List<T> runWithTimeout(Collection<Callable<T>> tasks, Duration timeout)
throws InterruptedException {

try (var scope = StructuredTaskScope.open(
Joiner.<T>allSuccessfulOrThrow(),
cf -> cf.withTimeout(timeout))) {

tasks.forEach(scope::fork);
return scope.join().map(Subtask::get).toList();
// 如果超时,join() 抛出 TimeoutException
}
}

设置名称(用于监控)

try (var scope = StructuredTaskScope.open(
Joiner.awaitAll(),
cf -> cf.withName("my-scope"))) {
// ...
}

实战示例

并行获取多个服务数据

import java.util.concurrent.*;
import java.time.Duration;

public class ParallelFetchDemo {

public static void main(String[] args) throws InterruptedException {
var result = fetchUserData(1L);
System.out.println(result);
}

static UserData fetchUserData(Long userId) throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<Object>allSuccessfulOrThrow(),
cf -> cf.withTimeout(Duration.ofSeconds(5)))) {

var userInfo = scope.fork(() -> fetchUserInfo(userId));
var orders = scope.fork(() -> fetchOrders(userId));
var friends = scope.fork(() -> fetchFriends(userId));

scope.join();

return new UserData(
(String) userInfo.get(),
(String) orders.get(),
(String) friends.get()
);
}
}

static String fetchUserInfo(Long id) throws InterruptedException {
Thread.sleep(100); // 模拟网络请求
return "User-" + id;
}

static String fetchOrders(Long id) throws InterruptedException {
Thread.sleep(150);
return "Orders-" + id;
}

static String fetchFriends(Long id) throws InterruptedException {
Thread.sleep(80);
return "Friends-" + id;
}

record UserData(String userInfo, String orders, String friends) {}
}

竞速获取最快响应

import java.util.concurrent.*;
import java.util.List;

public class RacingDemo {

public static void main(String[] args) throws InterruptedException {
String result = fetchFromMirrors(List.of(
"mirror1.example.com",
"mirror2.example.com",
"mirror3.example.com"
));
System.out.println("获取结果: " + result);
}

static String fetchFromMirrors(List<String> mirrors) throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<String>anySuccessfulResultOrThrow())) {

mirrors.forEach(mirror ->
scope.fork(() -> fetchFromMirror(mirror)));

return scope.join();
}
}

static String fetchFromMirror(String mirror) throws InterruptedException {
int delay = (int) (Math.random() * 1000);
Thread.sleep(delay);
return "Data from " + mirror + " (delay: " + delay + "ms)";
}
}

自定义 Joiner - 收集成功结果忽略失败

import java.util.concurrent.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Stream;

// 自定义 Joiner:收集成功结果,忽略失败
class CollectingJoiner<T> implements Joiner<T, Stream<T>> {
private final ConcurrentLinkedQueue<T> results = new ConcurrentLinkedQueue<>();

@Override
public boolean onComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
}
return false; // 不取消 scope
}

@Override
public Stream<T> result() {
return results.stream();
}
}

// 使用自定义 Joiner
<T> List<T> collectSuccessful(List<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(new CollectingJoiner<T>())) {
tasks.forEach(scope::fork);
return scope.join().toList();
}
}

嵌套的结构化并发

import java.util.concurrent.*;

public class NestedStructuredConcurrency {

public static void main(String[] args) throws InterruptedException {
String result = processRequest("req-1");
System.out.println(result);
}

static String processRequest(String requestId) throws InterruptedException {
try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow())) {

var system1 = scope.fork(() -> processSystem1(requestId));
var system2 = scope.fork(() -> processSystem2(requestId));

scope.join();

return system1.get() + " + " + system2.get();
}
}

static String processSystem1(String requestId) throws InterruptedException {
// 子系统内部也使用结构化并发
try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow())) {

var db = scope.fork(() -> fetchFromDB(requestId));
var cache = scope.fork(() -> fetchFromCache(requestId));

scope.join();

return "System1[" + db.get() + ", " + cache.get() + "]";
}
}

static String processSystem2(String requestId) throws InterruptedException {
Thread.sleep(100);
return "System2[" + requestId + "]";
}

static String fetchFromDB(String id) throws InterruptedException {
Thread.sleep(200);
return "DB:" + id;
}

static String fetchFromCache(String id) throws InterruptedException {
Thread.sleep(50);
return "Cache:" + id;
}
}

异常处理

FailedException

当 scope 被认为失败时,join() 方法抛出 StructuredTaskScope.FailedException

try (var scope = StructuredTaskScope.open()) {
scope.fork(() -> {
throw new IOException("Connection failed");
});
scope.fork(() -> "success");

scope.join(); // 抛出 FailedException

} catch (StructuredTaskScope.FailedException e) {
Throwable cause = e.getCause(); // 获取导致失败的原始异常
if (cause instanceof IOException ioe) {
System.out.println("IO 错误: " + ioe.getMessage());
}
}

TimeoutException

使用 withTimeout() 时,超时会导致 join() 抛出 TimeoutException

try (var scope = StructuredTaskScope.open(
Joiner.awaitAll(),
cf -> cf.withTimeout(Duration.ofSeconds(1)))) {

scope.fork(() -> {
Thread.sleep(5000); // 耗时操作
return "result";
});

scope.join();

} catch (TimeoutException e) {
System.out.println("操作超时,子任务已被取消");
}

取消传播

结构化并发的一个重要特性是取消自动传播:

// 如果所有者线程被中断,所有子任务都会被取消
void handle() throws InterruptedException {
try (var scope = StructuredTaskScope.open()) {
scope.fork(() -> task1());
scope.fork(() -> task2());

// 如果当前线程在此处被中断,两个子任务都会被取消
scope.join();
}
}

子任务应该正确响应中断:

String task() {
try {
// 执行可能阻塞的操作
Thread.sleep(1000);
return "result";
} catch (InterruptedException e) {
// 恢复中断状态并快速返回
Thread.currentThread().interrupt();
throw new RuntimeException("Task cancelled", e);
}
}

与虚拟线程的配合

结构化并发与虚拟线程是天然的最佳搭档。默认情况下,fork() 方法使用虚拟线程执行子任务:

try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow())) {

// 派生大量虚拟线程执行子任务
for (int i = 0; i < 1000; i++) {
final int id = i;
scope.fork(() -> handleRequest(id));
}

scope.join();
}

虚拟线程提供了规模(可以创建数百万个线程),结构化并发提供了协调(管理这些线程的生命周期和依赖关系)。两者结合,让编写高吞吐量的并发应用变得简单。

可观测性

线程转储

使用 jcmd 可以生成包含结构化并发层级信息的线程转储:

jcmd <pid> Thread.dump_to_file -format=json threads.json

JSON 格式的线程转储会显示 StructuredTaskScope 如何将线程组织成层级结构,每个 scope 包含派生出的线程数组及其调用栈。这使得调试和监控变得更容易——可以清晰地看到一个任务的子任务在做什么。

结构化使用约束

StructuredTaskScope 强制执行结构化使用:

  • 只能在所有者线程中调用 fork() 方法
  • 必须在 try-with-resources 块中使用
  • 必须在 scope 关闭前调用 join()
  • 不能在 scope 关闭后继续使用
// 错误:在非所有者线程中 fork
try (var scope = StructuredTaskScope.open()) {
new Thread(() -> {
scope.fork(() -> "task"); // 抛出异常!
}).start();
}

// 错误:忘记调用 join()
try (var scope = StructuredTaskScope.open()) {
scope.fork(() -> "task");
// 忘记调用 scope.join()
} // close() 会等待子任务完成,但可能抛出异常

// 错误:scope 泄漏
var scope = StructuredTaskScope.open();
scope.fork(() -> "task");
// 忘记调用 scope.close()!

违反这些约束会抛出 StructureViolationException

Java 21-24 API 说明

如果你使用的是 Java 21-24,API 有所不同。

Java 21-24 API 结构

// Java 21-24 使用子类而非 Joiner
public class StructuredTaskScope<T> implements AutoCloseable {
public StructuredTaskScope() { }
public StructuredTaskScope(String name, ThreadFactory factory) { }

public <U extends T> Subtask<U> fork(Callable<? extends U> task);
public StructuredTaskScope<T> join() throws InterruptedException;
public void close();
public void shutdown();

// 内置策略子类
public static final class ShutdownOnSuccess<T> extends StructuredTaskScope<T> { }
public static final class ShutdownOnFailure extends StructuredTaskScope<Object> { }
}

Java 21 示例代码

// Java 21-24 风格:使用 ShutdownOnFailure
Response handle() throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());

scope.join(); // 等待所有子任务完成
scope.throwIfFailed(); // 如果有子任务失败,抛出异常

return new Response(user.resultNow(), order.resultNow());
}
}

// Java 21-24 风格:使用 ShutdownOnSuccess
String race(List<Callable<String>> tasks) throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
tasks.forEach(scope::fork);
scope.join();
return scope.result(); // 返回第一个成功的结果
}
}

迁移指南

从 Java 21-24 迁移到 Java 25+:

Java 21-24Java 25+
new ShutdownOnFailure()StructuredTaskScope.open()
new ShutdownOnSuccess<T>()StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())
scope.throwIfFailed()自动抛出(join() 抛出 FailedException
scope.result()scope.join() 直接返回结果
future.resultNow()subtask.get()
Future<T> 返回类型Subtask<T> 返回类型

小结

结构化并发是 Java 并发编程的重要改进:

  1. 结构清晰:任务-子任务的关系在代码结构中得到体现
  2. 简化错误处理:子任务失败时自动取消其他子任务
  3. 自动传播取消:外部取消会自动传播到所有子任务
  4. 增强可观测性:线程转储清晰显示任务层级
  5. 与虚拟线程完美配合:虚拟线程提供规模,结构化并发提供协调

结构化并发改变了我们思考并发程序的方式。它让我们回归到类似单线程代码的结构——子任务必须返回到创建它的地方,生命周期由语法块限定。这种模式使得并发程序更容易理解、调试和维护。