异步调用支持
在现代应用中,异步调用越来越常见,比如 CompletableFuture、回调函数、响应式编程等。Sentinel 提供了异步调用链路的统计支持,确保在异步场景下也能正确地进行流量控制和熔断降级。
同步与异步的区别
在同步调用中,请求的处理在同一线程中完成,调用链路清晰:
// 同步调用:线程 A 执行整个过程
try {
Entry entry = SphU.entry("resourceA");
// 业务逻辑
entry.exit();
} catch (BlockException e) {
// 处理限流
}
而在异步调用中,请求的发起和处理可能在不同线程中:
// 异步调用:线程 A 发起请求,线程 B 处理结果
asyncService.execute(() -> {
// 在新线程中执行
});
Sentinel 的 Context 存储在 ThreadLocal 中,默认情况下异步线程无法继承主线程的调用链路上下文。因此需要特殊处理来维持正确的调用链路关系。
AsyncEntry 基本用法
Sentinel 提供了 SphU.asyncEntry() 方法来定义异步资源。
基本示例
import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
public void asyncCall() {
try {
// 创建异步 Entry
AsyncEntry entry = SphU.asyncEntry("asyncResource");
// 异步调用
doAsync(result -> {
try {
// 在回调中处理结果
System.out.println("处理结果: " + result);
} finally {
// 必须在回调结束时 exit
entry.exit();
}
});
} catch (BlockException e) {
// 被限流时的处理
System.out.println("请求被限流");
}
}
private void doAsync(Consumer<String> callback) {
new Thread(() -> {
try {
Thread.sleep(100);
callback.accept("success");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
重要说明
SphU.asyncEntry() 不会影响当前线程的 Context。这意味着以下两个 entry 在调用链上是平级关系,而不是嵌套关系:
// 调用链结构:
// -parent
// ---asyncResource
// ---syncResource
AsyncEntry asyncEntry = SphU.asyncEntry("asyncResource");
Entry syncEntry = SphU.entry("syncResource");
这与同步调用的嵌套关系不同:
// 同步嵌套调用链:
// -parent
// ---resourceA
// -----resourceB
Entry entryA = SphU.entry("resourceA");
Entry entryB = SphU.entry("resourceB"); // 嵌套在 resourceA 下
异步上下文切换
如果需要在异步回调中嵌套其他资源调用,需要使用 ContextUtil.runOnContext() 进行上下文切换。
问题场景
public void asyncCallWithNested() {
try {
AsyncEntry entry = SphU.asyncEntry("asyncResource");
doAsync(result -> {
try {
// 这里尝试嵌套调用,但 Context 已经丢失
Entry nestedEntry = SphU.entry("nestedResource"); // 错误!
// ...
nestedEntry.exit();
} finally {
entry.exit();
}
});
} catch (BlockException e) {
// ...
}
}
上述代码中,由于回调在新的线程中执行,无法获取主线程的 Context,导致调用链路关系断裂。
正确做法
使用 ContextUtil.runOnContext() 切换到异步 Context:
import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;
public void asyncCallWithNested() {
try {
AsyncEntry entry = SphU.asyncEntry("asyncResource");
doAsync(result -> {
// 通过 runOnContext 切换到异步 Context
ContextUtil.runOnContext(entry.getAsyncContext(), () -> {
try {
// 现在可以正确嵌套调用
Entry nestedEntry = SphU.entry("nestedResource");
try {
// 处理结果
handleResult(result);
} finally {
nestedEntry.exit();
}
} finally {
entry.exit();
}
});
});
} catch (BlockException e) {
System.out.println("请求被限流");
}
}
private void handleResult(String result) {
// 处理异步结果
System.out.println("处理结果: " + result);
}
这样调用链就变成:
-parent
---asyncResource
-----nestedResource
CompletableFuture 示例
CompletableFuture 是 Java 8 引入的异步编程工具,与 Sentinel 结合使用时需要注意上下文传递。
基本用法
import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import java.util.concurrent.CompletableFuture;
public class AsyncService {
public CompletableFuture<String> asyncOperation(String param) {
AsyncEntry entry = null;
try {
entry = SphU.asyncEntry("asyncOperation");
return CompletableFuture.supplyAsync(() -> {
// 在异步线程中执行业务逻辑
try {
Thread.sleep(100);
return "result: " + param;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "error";
}
}).whenComplete((result, ex) -> {
// 在回调中 exit
entry.exit();
});
} catch (BlockException e) {
return CompletableFuture.completedFuture("blocked");
}
}
}
嵌套资源调用
public CompletableFuture<String> asyncWithNested(String param) {
AsyncEntry entry = null;
try {
entry = SphU.asyncEntry("asyncOperation");
AsyncEntry finalEntry = entry;
return CompletableFuture.supplyAsync(() -> {
// 切换到异步 Context
ContextUtil.runOnContext(finalEntry.getAsyncContext(), () -> {
try {
Entry nested = SphU.entry("nestedOperation");
try {
// 嵌套资源调用
doSomething();
} finally {
nested.exit();
}
} finally {
finalEntry.exit();
}
});
return "done";
});
} catch (BlockException e) {
return CompletableFuture.completedFuture("blocked");
}
}
响应式编程支持
对于 Reactor、RxJava 等响应式编程框架,Sentinel 提供了专门的适配模块。
Reactor 适配
添加依赖:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-reactor-adapter</artifactId>
<version>1.8.8</version>
</dependency>
使用方式:
import com.alibaba.csp.sentinel.adapter.reactor.SentinelReactorTransformer;
import reactor.core.publisher.Mono;
public class ReactiveService {
public Mono<String> reactiveOperation(String param) {
return Mono.fromCallable(() -> {
// 业务逻辑
return "result: " + param;
})
// 使用 Sentinel 转换器包装
.transform(new SentinelReactorTransformer<>("reactiveOperation"));
}
}
WebFlux 整合
Spring WebFlux 项目可以直接使用 sentinel-spring-webflux-adapter:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-webflux-adapter</artifactId>
<version>1.8.8</version>
</dependency>
配置过滤器:
import com.alibaba.csp.sentinel.adapter.spring.webflux.SentinelWebFluxFilter;
import com.alibaba.csp.sentinel.adapter.spring.webflux.exception.SentinelBlockExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class WebFluxConfig {
@Bean
public SentinelWebFluxFilter sentinelWebFluxFilter() {
return new SentinelWebFluxFilter();
}
@Bean
public SentinelBlockExceptionHandler sentinelBlockExceptionHandler() {
return new SentinelBlockExceptionHandler();
}
}
线程池场景
在使用线程池执行异步任务时,需要特别注意上下文的传递。
问题代码
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void executeAsync() {
Entry entry = null;
try {
entry = SphU.entry("asyncTask");
executor.submit(() -> {
// 错误:这里 Context 已经丢失
doSomething();
});
} catch (BlockException e) {
System.out.println("被限流");
} finally {
if (entry != null) {
entry.exit();
}
}
}
}
正确做法
使用 AsyncEntry 并在任务完成后 exit:
public class ThreadPoolDemo {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void executeAsync() {
AsyncEntry entry = null;
try {
entry = SphU.asyncEntry("asyncTask");
executor.submit(() -> {
try {
// 业务逻辑
doSomething();
} finally {
// 在任务完成时 exit
entry.exit();
}
});
} catch (BlockException e) {
System.out.println("被限流");
}
}
private void doSomething() {
System.out.println("执行异步任务");
}
}
完整示例
下面是一个完整的异步调用示例,展示了各种异步场景的处理:
import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncDemo {
private static final ExecutorService executor = Executors.newFixedThreadPool(5);
public static void main(String[] args) throws InterruptedException {
// 初始化规则
initRules();
// 示例 1:基本异步调用
basicAsyncCall();
// 示例 2:嵌套资源调用
nestedAsyncCall();
// 示例 3:CompletableFuture
completableFutureDemo();
Thread.sleep(2000);
executor.shutdown();
}
// 基本异步调用
private static void basicAsyncCall() {
System.out.println("=== 基本异步调用 ===");
try {
AsyncEntry entry = SphU.asyncEntry("basicAsync");
executor.submit(() -> {
try {
System.out.println("执行异步任务");
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
entry.exit();
}
});
} catch (BlockException e) {
System.out.println("basicAsync 被限流");
}
}
// 嵌套资源调用
private static void nestedAsyncCall() {
System.out.println("=== 嵌套资源调用 ===");
try {
AsyncEntry asyncEntry = SphU.asyncEntry("parentAsync");
executor.submit(() -> {
ContextUtil.runOnContext(asyncEntry.getAsyncContext(), () -> {
try {
// 嵌套同步资源
Entry syncEntry = SphU.entry("childSync");
try {
System.out.println("执行嵌套任务");
Thread.sleep(100);
} finally {
syncEntry.exit();
}
} catch (BlockException e) {
System.out.println("childSync 被限流");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
asyncEntry.exit();
}
});
});
} catch (BlockException e) {
System.out.println("parentAsync 被限流");
}
}
// CompletableFuture 示例
private static void completableFutureDemo() {
System.out.println("=== CompletableFuture ===");
try {
AsyncEntry entry = SphU.asyncEntry("futureAsync");
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
return "Hello";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Error";
}
}, executor).thenAccept(result -> {
System.out.println("收到结果: " + result);
}).whenComplete((v, ex) -> {
entry.exit();
});
} catch (BlockException e) {
System.out.println("futureAsync 被限流");
}
}
private static void initRules() {
FlowRule rule = new FlowRule();
rule.setResource("basicAsync");
rule.setCount(10);
rule.setGrade(1);
FlowRuleManager.loadRules(Collections.singletonList(rule));
}
}
注意事项
1. Entry 必须成对
asyncEntry() 和 exit() 必须成对出现,且 exit() 必须在异步操作完成后调用:
// 正确
AsyncEntry entry = SphU.asyncEntry("resource");
asyncOperation(() -> {
try {
// ...
} finally {
entry.exit(); // 在回调中 exit
}
});
// 错误:exit 太早
AsyncEntry entry = SphU.asyncEntry("resource");
entry.exit(); // 异步操作还没完成
asyncOperation(...);
2. 异常处理
异步操作中的异常不会自动传播到主线程,需要单独处理:
try {
AsyncEntry entry = SphU.asyncEntry("resource");
asyncOperation(result -> {
try {
// 处理结果
} catch (Exception e) {
// 记录异常
Tracer.trace(e);
} finally {
entry.exit();
}
});
} catch (BlockException e) {
// 处理限流
}
3. 资源泄漏
如果异步操作没有完成或回调没有执行,会导致 Entry 泄漏。建议:
- 设置合理的超时时间
- 使用 try-finally 确保 exit 被执行
- 监控 Entry 的数量
4. 性能考虑
每次异步调用都需要创建 AsyncEntry,有一定的性能开销。对于高频异步调用场景,需要权衡是否使用 Sentinel 保护。
最佳实践
1. 使用工具类封装
封装异步调用工具类,简化使用:
import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class SentinelAsyncUtils {
public static <T> void executeAsync(
String resourceName,
Supplier<T> supplier,
Consumer<T> successHandler,
Consumer<Throwable> errorHandler) {
AsyncEntry entry = null;
try {
entry = SphU.asyncEntry(resourceName);
AsyncEntry finalEntry = entry;
CompletableFuture.supplyAsync(supplier)
.whenComplete((result, ex) -> {
if (ex != null) {
errorHandler.accept(ex);
} else {
successHandler.accept(result);
}
finalEntry.exit();
});
} catch (BlockException e) {
errorHandler.accept(e);
}
}
}
2. 结合 Spring @Async
在 Spring 异步方法中使用 Sentinel:
import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class AsyncService {
@Async
public void asyncProcess(String data) {
AsyncEntry entry = null;
try {
entry = SphU.asyncEntry("asyncProcess");
// 业务逻辑
processData(data);
} catch (BlockException e) {
System.out.println("被限流");
} finally {
if (entry != null) {
entry.exit();
}
}
}
private void processData(String data) {
// 处理数据
}
}
3. 监控异步资源
异步资源的监控与同步资源相同,可以通过控制台或 API 查看:
# 查看资源统计
curl http://localhost:8719/cnode?id=asyncResource