CompletableFuture 异步编程
CompletableFuture 是 Java 8 引入的异步编程工具,它实现了 Future 和 CompletionStage 接口,提供了强大的异步任务组合能力。相比于传统的 Future,CompletableFuture 支持回调、任务链式组合、异常处理等功能,是构建异步应用的核心组件。
Future 的局限性
在 CompletableFuture 出现之前,Java 使用 Future 表示异步计算结果。但 Future 存在以下局限:
无法主动完成
Future 只能等待异步任务完成,无法手动设置结果。
无法设置回调
Future.get() 会阻塞调用线程,无法在任务完成后自动执行回调。
无法链式调用
多个异步任务之间的依赖关系难以表达。
异常处理不便
Future.get() 抛出的 ExecutionException 需要手动处理。
import java.util.concurrent.*;
public class FutureLimitation {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Hello";
});
String result = future.get();
System.out.println(result);
executor.shutdown();
}
}
CompletableFuture 解决了这些问题,提供了更强大的异步编程能力。
创建 CompletableFuture
supplyAsync - 有返回值的异步任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SupplyAsyncDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务执行中: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello CompletableFuture";
}, executor);
System.out.println("主线程继续执行");
String result = future.join();
System.out.println("结果: " + result);
executor.shutdown();
}
}
supplyAsync 有两个重载方法:
CompletableFuture.supplyAsync(Supplier<U> supplier); // 使用 ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor); // 指定线程池
runAsync - 无返回值的异步任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("执行无返回值的异步任务");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
future.join();
System.out.println("任务完成");
completedFuture - 已完成的 Future
CompletableFuture<String> future = CompletableFuture.completedFuture("已完成的值");
System.out.println(future.join());
手动完成
import java.util.concurrent.CompletableFuture;
public class ManualCompleteDemo {
public static void main(String[] args) {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(1000);
future.complete("手动设置结果");
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
}).start();
System.out.println(future.join());
}
}
任务链式组合
CompletableFuture 的核心优势在于可以链式组合多个异步任务。
thenApply - 转换结果
import java.util.concurrent.CompletableFuture;
public class ThenApplyDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);
System.out.println(future.join());
}
}
thenApply 在前一个任务完成后执行,接收前一个任务的结果作为参数,返回新的结果。
thenAccept - 消费结果
CompletableFuture
.supplyAsync(() -> 100)
.thenAccept(result -> System.out.println("收到结果: " + result));
thenAccept 消费前一个任务的结果,不返回新值。
thenRun - 执行后续动作
CompletableFuture
.supplyAsync(() -> "任务完成")
.thenRun(() -> System.out.println("后续动作执行"));
thenRun 不接收前一个任务的结果,也不返回值。
thenCompose - 扁平化组合
import java.util.concurrent.CompletableFuture;
public class ThenComposeDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
System.out.println(future.join());
}
}
thenCompose 用于连接两个有依赖关系的异步任务,避免嵌套的 CompletableFuture<CompletableFuture<T>>。
thenCombine - 组合两个独立任务
import java.util.concurrent.CompletableFuture;
public class ThenCombineDemo {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> result = future1.thenCombine(future2, Integer::sum);
System.out.println("结果: " + result.join());
}
}
thenCombine 组合两个独立的异步任务,两个任务并行执行,都完成后合并结果。
thenAcceptBoth 和 runAfterBoth
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
future1.thenAcceptBoth(future2, (a, b) -> {
System.out.println("两个任务都完成: " + a + " + " + b + " = " + (a + b));
});
future1.runAfterBoth(future2, () -> {
System.out.println("两个任务都完成,执行后续动作");
});
applyToEither - 任一任务完成
import java.util.concurrent.CompletableFuture;
public class ApplyToEitherDemo {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "来自任务1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "来自任务2";
});
CompletableFuture<String> result = future1.applyToEither(future2, s -> s);
System.out.println("结果: " + result.join());
}
}
applyToEither 在任一任务完成时执行,返回先完成的任务结果。
acceptToEither 和 runAfterEither
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务2";
});
future1.acceptToEither(future2, s -> {
System.out.println("先完成的任务: " + s);
});
future1.runAfterEither(future2, () -> {
System.out.println("任一任务完成");
});
多任务组合
allOf - 等待所有任务完成
import java.util.concurrent.CompletableFuture;
import java.util.Arrays;
public class AllOfDemo {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "结果1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "结果2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "结果3";
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
allOf.join();
System.out.println("所有任务完成");
System.out.println("结果: " + future1.join() + ", " + future2.join() + ", " + future3.join());
}
}
anyOf - 任一任务完成
import java.util.concurrent.CompletableFuture;
public class AnyOfDemo {
public static void main(String[] args) {
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务1";
}),
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务2";
}),
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务3";
})
);
System.out.println("第一个完成的任务: " + anyOf.join());
}
}
异常处理
exceptionally - 异常恢复
import java.util.concurrent.CompletableFuture;
public class ExceptionallyDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "正常结果";
})
.exceptionally(e -> {
System.out.println("捕获异常: " + e.getMessage());
return "默认值";
});
System.out.println("结果: " + future.join());
}
}
handle - 统一处理成功和失败
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "正常结果";
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("异常: " + ex.getMessage());
return "异常恢复值";
}
return result + " (已处理)";
});
System.out.println("结果: " + future.join());
whenComplete - 完成时回调
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("任务成功完成: " + result);
} else {
System.out.println("任务失败: " + ex.getMessage());
}
});
System.out.println(future.join());
超时处理
orTimeout - 超时抛异常
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class TimeoutDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "正常结果";
})
.orTimeout(1, TimeUnit.SECONDS)
.exceptionally(e -> "超时了");
System.out.println(future.join());
}
}
completeOnTimeout - 超时返回默认值
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "正常结果";
})
.completeOnTimeout("超时默认值", 1, TimeUnit.SECONDS);
System.out.println(future.join());
实战示例
并行获取多个服务数据
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ParallelFetchDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(
() -> fetchUser(1L), executor);
CompletableFuture<String> ordersFuture = CompletableFuture.supplyAsync(
() -> fetchOrders(1L), executor);
CompletableFuture<String> friendsFuture = CompletableFuture.supplyAsync(
() -> fetchFriends(1L), executor);
CompletableFuture.allOf(userFuture, ordersFuture, friendsFuture).join();
System.out.println("用户信息: " + userFuture.join());
System.out.println("订单信息: " + ordersFuture.join());
System.out.println("好友信息: " + friendsFuture.join());
executor.shutdown();
}
static String fetchUser(Long id) {
sleep(500);
return "User-" + id;
}
static String fetchOrders(Long id) {
sleep(800);
return "Orders of User-" + id;
}
static String fetchFriends(Long id) {
sleep(600);
return "Friends of User-" + id;
}
static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
异步任务链
import java.util.concurrent.CompletableFuture;
public class AsyncTaskChainDemo {
public static void main(String[] args) {
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> {
System.out.println("1. 查询用户ID");
return 1L;
})
.thenApplyAsync(userId -> {
System.out.println("2. 根据ID查询用户详情");
return "User-" + userId;
})
.thenApplyAsync(user -> {
System.out.println("3. 查询用户订单");
return user + " 的订单列表";
})
.thenApplyAsync(orders -> {
System.out.println("4. 发送订单通知");
return "已发送: " + orders;
});
System.out.println("最终结果: " + result.join());
}
}
重试机制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
public class RetryDemo {
public static void main(String[] args) {
AtomicInteger retryCount = new AtomicInteger(0);
CompletableFuture<String> future = retryAsync(
() -> {
int count = retryCount.incrementAndGet();
System.out.println("尝试第 " + count + " 次");
if (count < 3) {
throw new RuntimeException("模拟失败");
}
return "成功";
},
3
);
System.out.println("结果: " + future.join());
}
static <T> CompletableFuture<T> retryAsync(
java.util.function.Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries - 1; i++) {
future = future.exceptionallyCompose(e ->
CompletableFuture.supplyAsync(supplier));
}
return future;
}
}
线程池选择
CompletableFuture 默认使用 ForkJoinPool.commonPool(),在生产环境中建议指定线程池:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "任务1", executor)
.thenApplyAsync(s -> s + " -> 任务2", executor)
.thenApplyAsync(s -> s + " -> 任务3", executor);
System.out.println(future.join());
executor.shutdown();
}
}
推荐做法:
- 为不同类型的任务使用不同的线程池
- I/O 密集型任务使用较大的线程池
- CPU 密集型任务使用与 CPU 核心数相当的线程池
- 使用虚拟线程(Java 21+)处理 I/O 密集型任务
小结
本章介绍了 CompletableFuture 的核心功能:
- 创建方式:supplyAsync、runAsync、completedFuture
- 链式组合:thenApply、thenAccept、thenRun、thenCompose、thenCombine
- 多任务组合:allOf、anyOf
- 异常处理:exceptionally、handle、whenComplete
- 超时处理:orTimeout、completeOnTimeout
- 实战应用:并行获取、任务链、重试机制
CompletableFuture 是构建异步应用的核心工具,掌握它的使用方法对于开发高性能并发应用至关重要。