跳到主要内容

CompletableFuture 异步编程

CompletableFuture 是 Java 8 引入的异步编程工具,它实现了 Future 和 CompletionStage 接口,提供了强大的异步任务组合能力。相比于传统的 Future,CompletableFuture 支持回调、任务链式组合、异常处理等功能,是构建异步应用的核心组件。

Future 的局限性

在 CompletableFuture 出现之前,Java 使用 Future 表示异步计算结果。但 Future 存在以下局限:

无法主动完成

Future 只能等待异步任务完成,无法手动设置结果。

无法设置回调

Future.get() 会阻塞调用线程,无法在任务完成后自动执行回调。

无法链式调用

多个异步任务之间的依赖关系难以表达。

异常处理不便

Future.get() 抛出的 ExecutionException 需要手动处理。

import java.util.concurrent.*;

public class FutureLimitation {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);

Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Hello";
});

String result = future.get();
System.out.println(result);

executor.shutdown();
}
}

CompletableFuture 解决了这些问题,提供了更强大的异步编程能力。

创建 CompletableFuture

supplyAsync - 有返回值的异步任务

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SupplyAsyncDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务执行中: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello CompletableFuture";
}, executor);

System.out.println("主线程继续执行");

String result = future.join();
System.out.println("结果: " + result);

executor.shutdown();
}
}

supplyAsync 有两个重载方法:

CompletableFuture.supplyAsync(Supplier<U> supplier);  // 使用 ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor); // 指定线程池

runAsync - 无返回值的异步任务

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("执行无返回值的异步任务");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

future.join();
System.out.println("任务完成");

completedFuture - 已完成的 Future

CompletableFuture<String> future = CompletableFuture.completedFuture("已完成的值");
System.out.println(future.join());

手动完成

import java.util.concurrent.CompletableFuture;

public class ManualCompleteDemo {
public static void main(String[] args) {
CompletableFuture<String> future = new CompletableFuture<>();

new Thread(() -> {
try {
Thread.sleep(1000);
future.complete("手动设置结果");
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
}).start();

System.out.println(future.join());
}
}

任务链式组合

CompletableFuture 的核心优势在于可以链式组合多个异步任务。

thenApply - 转换结果

import java.util.concurrent.CompletableFuture;

public class ThenApplyDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);

System.out.println(future.join());
}
}

thenApply 在前一个任务完成后执行,接收前一个任务的结果作为参数,返回新的结果。

thenAccept - 消费结果

CompletableFuture
.supplyAsync(() -> 100)
.thenAccept(result -> System.out.println("收到结果: " + result));

thenAccept 消费前一个任务的结果,不返回新值。

thenRun - 执行后续动作

CompletableFuture
.supplyAsync(() -> "任务完成")
.thenRun(() -> System.out.println("后续动作执行"));

thenRun 不接收前一个任务的结果,也不返回值。

thenCompose - 扁平化组合

import java.util.concurrent.CompletableFuture;

public class ThenComposeDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

System.out.println(future.join());
}
}

thenCompose 用于连接两个有依赖关系的异步任务,避免嵌套的 CompletableFuture<CompletableFuture<T>>

thenCombine - 组合两个独立任务

import java.util.concurrent.CompletableFuture;

public class ThenCombineDemo {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});

CompletableFuture<Integer> result = future1.thenCombine(future2, Integer::sum);

System.out.println("结果: " + result.join());
}
}

thenCombine 组合两个独立的异步任务,两个任务并行执行,都完成后合并结果。

thenAcceptBoth 和 runAfterBoth

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);

future1.thenAcceptBoth(future2, (a, b) -> {
System.out.println("两个任务都完成: " + a + " + " + b + " = " + (a + b));
});

future1.runAfterBoth(future2, () -> {
System.out.println("两个任务都完成,执行后续动作");
});

applyToEither - 任一任务完成

import java.util.concurrent.CompletableFuture;

public class ApplyToEitherDemo {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "来自任务1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "来自任务2";
});

CompletableFuture<String> result = future1.applyToEither(future2, s -> s);

System.out.println("结果: " + result.join());
}
}

applyToEither 在任一任务完成时执行,返回先完成的任务结果。

acceptToEither 和 runAfterEither

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务2";
});

future1.acceptToEither(future2, s -> {
System.out.println("先完成的任务: " + s);
});

future1.runAfterEither(future2, () -> {
System.out.println("任一任务完成");
});

多任务组合

allOf - 等待所有任务完成

import java.util.concurrent.CompletableFuture;
import java.util.Arrays;

public class AllOfDemo {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "结果1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "结果2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "结果3";
});

CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);

allOf.join();

System.out.println("所有任务完成");
System.out.println("结果: " + future1.join() + ", " + future2.join() + ", " + future3.join());
}
}

anyOf - 任一任务完成

import java.util.concurrent.CompletableFuture;

public class AnyOfDemo {
public static void main(String[] args) {
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务1";
}),
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务2";
}),
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务3";
})
);

System.out.println("第一个完成的任务: " + anyOf.join());
}
}

异常处理

exceptionally - 异常恢复

import java.util.concurrent.CompletableFuture;

public class ExceptionallyDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "正常结果";
})
.exceptionally(e -> {
System.out.println("捕获异常: " + e.getMessage());
return "默认值";
});

System.out.println("结果: " + future.join());
}
}

handle - 统一处理成功和失败

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "正常结果";
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("异常: " + ex.getMessage());
return "异常恢复值";
}
return result + " (已处理)";
});

System.out.println("结果: " + future.join());

whenComplete - 完成时回调

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("任务成功完成: " + result);
} else {
System.out.println("任务失败: " + ex.getMessage());
}
});

System.out.println(future.join());

超时处理

orTimeout - 超时抛异常

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class TimeoutDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "正常结果";
})
.orTimeout(1, TimeUnit.SECONDS)
.exceptionally(e -> "超时了");

System.out.println(future.join());
}
}

completeOnTimeout - 超时返回默认值

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "正常结果";
})
.completeOnTimeout("超时默认值", 1, TimeUnit.SECONDS);

System.out.println(future.join());

实战示例

并行获取多个服务数据

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ParallelFetchDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);

CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(
() -> fetchUser(1L), executor);

CompletableFuture<String> ordersFuture = CompletableFuture.supplyAsync(
() -> fetchOrders(1L), executor);

CompletableFuture<String> friendsFuture = CompletableFuture.supplyAsync(
() -> fetchFriends(1L), executor);

CompletableFuture.allOf(userFuture, ordersFuture, friendsFuture).join();

System.out.println("用户信息: " + userFuture.join());
System.out.println("订单信息: " + ordersFuture.join());
System.out.println("好友信息: " + friendsFuture.join());

executor.shutdown();
}

static String fetchUser(Long id) {
sleep(500);
return "User-" + id;
}

static String fetchOrders(Long id) {
sleep(800);
return "Orders of User-" + id;
}

static String fetchFriends(Long id) {
sleep(600);
return "Friends of User-" + id;
}

static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

异步任务链

import java.util.concurrent.CompletableFuture;

public class AsyncTaskChainDemo {
public static void main(String[] args) {
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> {
System.out.println("1. 查询用户ID");
return 1L;
})
.thenApplyAsync(userId -> {
System.out.println("2. 根据ID查询用户详情");
return "User-" + userId;
})
.thenApplyAsync(user -> {
System.out.println("3. 查询用户订单");
return user + " 的订单列表";
})
.thenApplyAsync(orders -> {
System.out.println("4. 发送订单通知");
return "已发送: " + orders;
});

System.out.println("最终结果: " + result.join());
}
}

重试机制

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

public class RetryDemo {
public static void main(String[] args) {
AtomicInteger retryCount = new AtomicInteger(0);

CompletableFuture<String> future = retryAsync(
() -> {
int count = retryCount.incrementAndGet();
System.out.println("尝试第 " + count + " 次");
if (count < 3) {
throw new RuntimeException("模拟失败");
}
return "成功";
},
3
);

System.out.println("结果: " + future.join());
}

static <T> CompletableFuture<T> retryAsync(
java.util.function.Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);

for (int i = 0; i < maxRetries - 1; i++) {
future = future.exceptionallyCompose(e ->
CompletableFuture.supplyAsync(supplier));
}

return future;
}
}

线程池选择

CompletableFuture 默认使用 ForkJoinPool.commonPool(),在生产环境中建议指定线程池:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "任务1", executor)
.thenApplyAsync(s -> s + " -> 任务2", executor)
.thenApplyAsync(s -> s + " -> 任务3", executor);

System.out.println(future.join());

executor.shutdown();
}
}

推荐做法

  • 为不同类型的任务使用不同的线程池
  • I/O 密集型任务使用较大的线程池
  • CPU 密集型任务使用与 CPU 核心数相当的线程池
  • 使用虚拟线程(Java 21+)处理 I/O 密集型任务

小结

本章介绍了 CompletableFuture 的核心功能:

  1. 创建方式:supplyAsync、runAsync、completedFuture
  2. 链式组合:thenApply、thenAccept、thenRun、thenCompose、thenCombine
  3. 多任务组合:allOf、anyOf
  4. 异常处理:exceptionally、handle、whenComplete
  5. 超时处理:orTimeout、completeOnTimeout
  6. 实战应用:并行获取、任务链、重试机制

CompletableFuture 是构建异步应用的核心工具,掌握它的使用方法对于开发高性能并发应用至关重要。