跳到主要内容

异步调用支持

在现代应用中,异步调用越来越常见,比如 CompletableFuture、回调函数、响应式编程等。Sentinel 提供了异步调用链路的统计支持,确保在异步场景下也能正确地进行流量控制和熔断降级。

同步与异步的区别

在同步调用中,请求的处理在同一线程中完成,调用链路清晰:

// 同步调用:线程 A 执行整个过程
try {
Entry entry = SphU.entry("resourceA");
// 业务逻辑
entry.exit();
} catch (BlockException e) {
// 处理限流
}

而在异步调用中,请求的发起和处理可能在不同线程中:

// 异步调用:线程 A 发起请求,线程 B 处理结果
asyncService.execute(() -> {
// 在新线程中执行
});

Sentinel 的 Context 存储在 ThreadLocal 中,默认情况下异步线程无法继承主线程的调用链路上下文。因此需要特殊处理来维持正确的调用链路关系。

AsyncEntry 基本用法

Sentinel 提供了 SphU.asyncEntry() 方法来定义异步资源。

基本示例

import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;

public void asyncCall() {
try {
// 创建异步 Entry
AsyncEntry entry = SphU.asyncEntry("asyncResource");

// 异步调用
doAsync(result -> {
try {
// 在回调中处理结果
System.out.println("处理结果: " + result);
} finally {
// 必须在回调结束时 exit
entry.exit();
}
});

} catch (BlockException e) {
// 被限流时的处理
System.out.println("请求被限流");
}
}

private void doAsync(Consumer<String> callback) {
new Thread(() -> {
try {
Thread.sleep(100);
callback.accept("success");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}

重要说明

SphU.asyncEntry() 不会影响当前线程的 Context。这意味着以下两个 entry 在调用链上是平级关系,而不是嵌套关系:

// 调用链结构:
// -parent
// ---asyncResource
// ---syncResource

AsyncEntry asyncEntry = SphU.asyncEntry("asyncResource");
Entry syncEntry = SphU.entry("syncResource");

这与同步调用的嵌套关系不同:

// 同步嵌套调用链:
// -parent
// ---resourceA
// -----resourceB

Entry entryA = SphU.entry("resourceA");
Entry entryB = SphU.entry("resourceB"); // 嵌套在 resourceA 下

异步上下文切换

如果需要在异步回调中嵌套其他资源调用,需要使用 ContextUtil.runOnContext() 进行上下文切换。

问题场景

public void asyncCallWithNested() {
try {
AsyncEntry entry = SphU.asyncEntry("asyncResource");

doAsync(result -> {
try {
// 这里尝试嵌套调用,但 Context 已经丢失
Entry nestedEntry = SphU.entry("nestedResource"); // 错误!
// ...
nestedEntry.exit();
} finally {
entry.exit();
}
});

} catch (BlockException e) {
// ...
}
}

上述代码中,由于回调在新的线程中执行,无法获取主线程的 Context,导致调用链路关系断裂。

正确做法

使用 ContextUtil.runOnContext() 切换到异步 Context:

import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;

public void asyncCallWithNested() {
try {
AsyncEntry entry = SphU.asyncEntry("asyncResource");

doAsync(result -> {
// 通过 runOnContext 切换到异步 Context
ContextUtil.runOnContext(entry.getAsyncContext(), () -> {
try {
// 现在可以正确嵌套调用
Entry nestedEntry = SphU.entry("nestedResource");
try {
// 处理结果
handleResult(result);
} finally {
nestedEntry.exit();
}
} finally {
entry.exit();
}
});
});

} catch (BlockException e) {
System.out.println("请求被限流");
}
}

private void handleResult(String result) {
// 处理异步结果
System.out.println("处理结果: " + result);
}

这样调用链就变成:

-parent
---asyncResource
-----nestedResource

CompletableFuture 示例

CompletableFuture 是 Java 8 引入的异步编程工具,与 Sentinel 结合使用时需要注意上下文传递。

基本用法

import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;

import java.util.concurrent.CompletableFuture;

public class AsyncService {

public CompletableFuture<String> asyncOperation(String param) {
AsyncEntry entry = null;
try {
entry = SphU.asyncEntry("asyncOperation");

return CompletableFuture.supplyAsync(() -> {
// 在异步线程中执行业务逻辑
try {
Thread.sleep(100);
return "result: " + param;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "error";
}
}).whenComplete((result, ex) -> {
// 在回调中 exit
entry.exit();
});

} catch (BlockException e) {
return CompletableFuture.completedFuture("blocked");
}
}
}

嵌套资源调用

public CompletableFuture<String> asyncWithNested(String param) {
AsyncEntry entry = null;
try {
entry = SphU.asyncEntry("asyncOperation");
AsyncEntry finalEntry = entry;

return CompletableFuture.supplyAsync(() -> {
// 切换到异步 Context
ContextUtil.runOnContext(finalEntry.getAsyncContext(), () -> {
try {
Entry nested = SphU.entry("nestedOperation");
try {
// 嵌套资源调用
doSomething();
} finally {
nested.exit();
}
} finally {
finalEntry.exit();
}
});
return "done";
});

} catch (BlockException e) {
return CompletableFuture.completedFuture("blocked");
}
}

响应式编程支持

对于 Reactor、RxJava 等响应式编程框架,Sentinel 提供了专门的适配模块。

Reactor 适配

添加依赖:

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-reactor-adapter</artifactId>
<version>1.8.8</version>
</dependency>

使用方式:

import com.alibaba.csp.sentinel.adapter.reactor.SentinelReactorTransformer;
import reactor.core.publisher.Mono;

public class ReactiveService {

public Mono<String> reactiveOperation(String param) {
return Mono.fromCallable(() -> {
// 业务逻辑
return "result: " + param;
})
// 使用 Sentinel 转换器包装
.transform(new SentinelReactorTransformer<>("reactiveOperation"));
}
}

WebFlux 整合

Spring WebFlux 项目可以直接使用 sentinel-spring-webflux-adapter:

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-webflux-adapter</artifactId>
<version>1.8.8</version>
</dependency>

配置过滤器:

import com.alibaba.csp.sentinel.adapter.spring.webflux.SentinelWebFluxFilter;
import com.alibaba.csp.sentinel.adapter.spring.webflux.exception.SentinelBlockExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WebFluxConfig {

@Bean
public SentinelWebFluxFilter sentinelWebFluxFilter() {
return new SentinelWebFluxFilter();
}

@Bean
public SentinelBlockExceptionHandler sentinelBlockExceptionHandler() {
return new SentinelBlockExceptionHandler();
}
}

线程池场景

在使用线程池执行异步任务时,需要特别注意上下文的传递。

问题代码

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {

private final ExecutorService executor = Executors.newFixedThreadPool(10);

public void executeAsync() {
Entry entry = null;
try {
entry = SphU.entry("asyncTask");

executor.submit(() -> {
// 错误:这里 Context 已经丢失
doSomething();
});

} catch (BlockException e) {
System.out.println("被限流");
} finally {
if (entry != null) {
entry.exit();
}
}
}
}

正确做法

使用 AsyncEntry 并在任务完成后 exit:

public class ThreadPoolDemo {

private final ExecutorService executor = Executors.newFixedThreadPool(10);

public void executeAsync() {
AsyncEntry entry = null;
try {
entry = SphU.asyncEntry("asyncTask");

executor.submit(() -> {
try {
// 业务逻辑
doSomething();
} finally {
// 在任务完成时 exit
entry.exit();
}
});

} catch (BlockException e) {
System.out.println("被限流");
}
}

private void doSomething() {
System.out.println("执行异步任务");
}
}

完整示例

下面是一个完整的异步调用示例,展示了各种异步场景的处理:

import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncDemo {

private static final ExecutorService executor = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws InterruptedException {
// 初始化规则
initRules();

// 示例 1:基本异步调用
basicAsyncCall();

// 示例 2:嵌套资源调用
nestedAsyncCall();

// 示例 3:CompletableFuture
completableFutureDemo();

Thread.sleep(2000);
executor.shutdown();
}

// 基本异步调用
private static void basicAsyncCall() {
System.out.println("=== 基本异步调用 ===");

try {
AsyncEntry entry = SphU.asyncEntry("basicAsync");

executor.submit(() -> {
try {
System.out.println("执行异步任务");
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
entry.exit();
}
});

} catch (BlockException e) {
System.out.println("basicAsync 被限流");
}
}

// 嵌套资源调用
private static void nestedAsyncCall() {
System.out.println("=== 嵌套资源调用 ===");

try {
AsyncEntry asyncEntry = SphU.asyncEntry("parentAsync");

executor.submit(() -> {
ContextUtil.runOnContext(asyncEntry.getAsyncContext(), () -> {
try {
// 嵌套同步资源
Entry syncEntry = SphU.entry("childSync");
try {
System.out.println("执行嵌套任务");
Thread.sleep(100);
} finally {
syncEntry.exit();
}
} catch (BlockException e) {
System.out.println("childSync 被限流");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
asyncEntry.exit();
}
});
});

} catch (BlockException e) {
System.out.println("parentAsync 被限流");
}
}

// CompletableFuture 示例
private static void completableFutureDemo() {
System.out.println("=== CompletableFuture ===");

try {
AsyncEntry entry = SphU.asyncEntry("futureAsync");

CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
return "Hello";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Error";
}
}, executor).thenAccept(result -> {
System.out.println("收到结果: " + result);
}).whenComplete((v, ex) -> {
entry.exit();
});

} catch (BlockException e) {
System.out.println("futureAsync 被限流");
}
}

private static void initRules() {
FlowRule rule = new FlowRule();
rule.setResource("basicAsync");
rule.setCount(10);
rule.setGrade(1);

FlowRuleManager.loadRules(Collections.singletonList(rule));
}
}

注意事项

1. Entry 必须成对

asyncEntry()exit() 必须成对出现,且 exit() 必须在异步操作完成后调用:

// 正确
AsyncEntry entry = SphU.asyncEntry("resource");
asyncOperation(() -> {
try {
// ...
} finally {
entry.exit(); // 在回调中 exit
}
});

// 错误:exit 太早
AsyncEntry entry = SphU.asyncEntry("resource");
entry.exit(); // 异步操作还没完成
asyncOperation(...);

2. 异常处理

异步操作中的异常不会自动传播到主线程,需要单独处理:

try {
AsyncEntry entry = SphU.asyncEntry("resource");

asyncOperation(result -> {
try {
// 处理结果
} catch (Exception e) {
// 记录异常
Tracer.trace(e);
} finally {
entry.exit();
}
});

} catch (BlockException e) {
// 处理限流
}

3. 资源泄漏

如果异步操作没有完成或回调没有执行,会导致 Entry 泄漏。建议:

  • 设置合理的超时时间
  • 使用 try-finally 确保 exit 被执行
  • 监控 Entry 的数量

4. 性能考虑

每次异步调用都需要创建 AsyncEntry,有一定的性能开销。对于高频异步调用场景,需要权衡是否使用 Sentinel 保护。

最佳实践

1. 使用工具类封装

封装异步调用工具类,简化使用:

import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;

import java.util.function.Consumer;
import java.util.function.Supplier;

public class SentinelAsyncUtils {

public static <T> void executeAsync(
String resourceName,
Supplier<T> supplier,
Consumer<T> successHandler,
Consumer<Throwable> errorHandler) {

AsyncEntry entry = null;
try {
entry = SphU.asyncEntry(resourceName);
AsyncEntry finalEntry = entry;

CompletableFuture.supplyAsync(supplier)
.whenComplete((result, ex) -> {
if (ex != null) {
errorHandler.accept(ex);
} else {
successHandler.accept(result);
}
finalEntry.exit();
});

} catch (BlockException e) {
errorHandler.accept(e);
}
}
}

2. 结合 Spring @Async

在 Spring 异步方法中使用 Sentinel:

import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncService {

@Async
public void asyncProcess(String data) {
AsyncEntry entry = null;
try {
entry = SphU.asyncEntry("asyncProcess");
// 业务逻辑
processData(data);
} catch (BlockException e) {
System.out.println("被限流");
} finally {
if (entry != null) {
entry.exit();
}
}
}

private void processData(String data) {
// 处理数据
}
}

3. 监控异步资源

异步资源的监控与同步资源相同,可以通过控制台或 API 查看:

# 查看资源统计
curl http://localhost:8719/cnode?id=asyncResource

下一步