跳到主要内容

断路器与容错

在微服务架构中,服务之间的依赖关系复杂,一个服务的故障可能引发连锁反应,导致整个系统崩溃。断路器模式是解决这一问题的核心方案。

为什么需要断路器?

雪崩效应

当服务 A 调用服务 B,服务 B 调用服务 C 时,如果服务 C 出现故障:

┌─────────────────────────────────────────────────────────────┐
│ 雪崩效应示意 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 服务 A ──> 服务 B ──> 服务 C (故障) │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ 响应超时 │
│ │ │ │ │
│ │ ▼ │ │
│ │ 线程阻塞等待 │ │
│ │ │ │ │
│ ▼ │ │ │
│ 线程池耗尽 ◄───────────┘ │
│ │ │
│ ▼ │
│ 服务 A 不可用 │
│ │
└─────────────────────────────────────────────────────────────┘

断路器模式

断路器模式通过监控服务调用的成功率和响应时间,在服务出现故障时快速失败,防止故障蔓延。

断路器状态机:

┌──────────────────────────────┐
│ │
▼ │
┌─────────┐ 失败率超过阈值 ┌─────────┐
│ 关闭 │ ─────────────────> │ 打开 │
│ (Closed)│ │ (Open) │
└─────────┘ └────┬────┘
▲ │
│ │ 超时后
│ │ 进入半开
│ ▼
│ ┌─────────┐
│ 成功率恢复 │ 半开 │
└──────────────────────── │(Half-Open)│
└─────────┘

│ 失败
│ 返回打开

┌─────────┐
│ 打开 │
└─────────┘

状态说明

状态说明
关闭 (Closed)正常状态,请求正常通过
打开 (Open)熔断状态,请求直接返回错误
半开 (Half-Open)探测状态,允许部分请求通过测试服务是否恢复

Sentinel

Sentinel 是阿里巴巴开源的流量控制和熔断降级组件,功能强大且易于使用。

Sentinel 核心概念

┌─────────────────────────────────────────────────────────────┐
│ Sentinel 核心功能 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 流量控制 │ │ 熔断降级 │ │ 系统保护 │ │
│ │ │ │ │ │ │ │
│ │ QPS 限制 │ │ 慢调用比例 │ │ CPU 使用率 │ │
│ │ 并发线程数 │ │ 异常比例 │ │ 系统负载 │ │
│ │ 冷启动 │ │ 异常数 │ │ 入口 QPS │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 热点参数 │ │ 来源访问 │ │
│ │ 限流 │ │ 控制 │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

快速开始

添加依赖

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

配置

spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080
port: 8719
eager: true

management:
endpoints:
web:
exposure:
include: '*'

启动 Sentinel Dashboard

java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar

访问 http://localhost:8080(默认用户名密码:sentinel/sentinel)

流量控制

QPS 限流

@RestController
public class UserController {

@GetMapping("/user/{id}")
@SentinelResource(value = "getUser", blockHandler = "getUserBlockHandler")
public User getUser(@PathVariable Long id) {
return userService.getById(id);
}

// 限流处理方法
public User getUserBlockHandler(Long id, BlockException ex) {
User user = new User();
user.setId(id);
user.setName("限流降级用户");
return user;
}
}

在 Sentinel Dashboard 中配置流控规则:

  • 资源名:getUser
  • 阈值类型:QPS
  • 单机阈值:10

并发线程数限流

@GetMapping("/order/{id}")
@SentinelResource(value = "getOrder", blockHandler = "getOrderBlockHandler")
public Order getOrder(@PathVariable Long id) {
return orderService.getById(id);
}

配置:

  • 资源名:getOrder
  • 阈值类型:并发线程数
  • 单机阈值:5

熔断降级

慢调用比例熔断

@GetMapping("/product/{id}")
@SentinelResource(value = "getProduct", fallback = "getProductFallback")
public Product getProduct(@PathVariable Long id) {
return productService.getById(id);
}

public Product getProductFallback(Long id, Throwable ex) {
Product product = new Product();
product.setId(id);
product.setName("熔断降级商品");
return product;
}

熔断规则配置:

参数说明
资源名getProduct
熔断策略慢调用比例
最大 RT500ms
比例阈值0.5
熔断时长10s
最小请求数5

异常比例熔断

当异常比例超过阈值时触发熔断。

异常数熔断

当异常数超过阈值时触发熔断。

热点参数限流

针对请求参数进行限流:

@GetMapping("/user/info")
@SentinelResource(value = "getUserInfo", blockHandler = "getUserInfoBlockHandler")
public User getUserInfo(@RequestParam Long id) {
return userService.getById(id);
}

public User getUserInfoBlockHandler(Long id, BlockException ex) {
return new User(id, "热点参数限流");
}

配置热点规则:

@Configuration
public class SentinelConfig {

@PostConstruct
public void init() {
// 热点参数规则
ParamFlowRule rule = new ParamFlowRule();
rule.setResource("getUserInfo");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setCount(5); // 每秒最多5次
rule.setParamIdx(0); // 第一个参数

// 特定参数值例外
ParamFlowItem item = new ParamFlowItem();
item.setObject("1001"); // 用户ID为1001
item.setClassType(Long.class.getName());
item.setCount(100); // 每秒最多100次

rule.setParamFlowItemList(Collections.singletonList(item));

ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
}
}

系统自适应保护

根据系统整体负载进行保护:

@Configuration
public class SystemRuleConfig {

@PostConstruct
public void init() {
List<SystemRule> rules = new ArrayList<>();

// CPU 使用率保护
SystemRule cpuRule = new SystemRule();
cpuRule.setHighestCpuUsage(0.8); // CPU 使用率超过 80%
rules.add(cpuRule);

// 系统负载保护
SystemRule loadRule = new SystemRule();
loadRule.setHighestSystemLoad(10); // 系统负载超过 10
rules.add(loadRule);

// 入口 QPS 保护
SystemRule qpsRule = new SystemRule();
qpsRule.setQps(1000); // 入口总 QPS 超过 1000
rules.add(qpsRule);

SystemRuleManager.loadRules(rules);
}
}

Sentinel 控制台集成

持久化配置到 Nacos

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
spring:
cloud:
sentinel:
datasource:
flow:
nacos:
server-addr: localhost:8848
data-id: ${spring.application.name}-flow-rules
group-id: SENTINEL_GROUP
rule-type: flow
degrade:
nacos:
server-addr: localhost:8848
data-id: ${spring.application.name}-degrade-rules
group-id: SENTINEL_GROUP
rule-type: degrade

Resilience4j

Resilience4j 是一个轻量级的容错库,是 Hystrix 的替代方案。

核心模块

模块说明
CircuitBreaker断路器
RateLimiter限流器
Retry重试
TimeLimiter超时限制
Bulkhead舱壁隔离
Cache缓存

添加依赖

<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-feign</artifactId>
</dependency>

断路器配置

resilience4j:
circuitbreaker:
configs:
default:
slidingWindowType: COUNT_BASED
slidingWindowSize: 10
failureRateThreshold: 50
waitDurationInOpenState: 10s
permittedNumberOfCallsInHalfOpenState: 3
slowCallDurationThreshold: 2s
slowCallRateThreshold: 50
instances:
userService:
baseConfig: default
orderService:
slidingWindowSize: 20
failureRateThreshold: 60

使用断路器

@Service
public class OrderService {

@Autowired
private UserClient userClient;

@CircuitBreaker(name = "userService", fallbackMethod = "getUserFallback")
public User getUser(Long userId) {
return userClient.getUser(userId);
}

private User getUserFallback(Long userId, Exception ex) {
log.warn("获取用户信息失败,使用降级数据: {}", ex.getMessage());
User user = new User();
user.setId(userId);
user.setName("降级用户");
return user;
}
}

限流器配置

resilience4j:
ratelimiter:
configs:
default:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 0
instances:
orderApi:
limitForPeriod: 100
limitRefreshPeriod: 1s

使用限流器

@RestController
public class OrderController {

@Autowired
private OrderService orderService;

@RateLimiter(name = "orderApi", fallbackMethod = "createOrderFallback")
@PostMapping("/order")
public Order createOrder(@RequestBody CreateOrderRequest request) {
return orderService.create(request);
}

private Order createOrderFallback(CreateOrderRequest request, Exception ex) {
throw new BusinessException("系统繁忙,请稍后重试");
}
}

重试配置

resilience4j:
retry:
configs:
default:
maxAttempts: 3
waitDuration: 500ms
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- java.io.IOException
- java.net.SocketTimeoutException
instances:
userService:
maxAttempts: 5
waitDuration: 1s

使用重试

@Service
public class PaymentService {

@Retry(name = "paymentService", fallbackMethod = "processFallback")
public PaymentResult process(PaymentRequest request) {
return paymentClient.process(request);
}

private PaymentResult processFallback(PaymentRequest request, Exception ex) {
log.error("支付处理失败: {}", ex.getMessage());
return PaymentResult.fail("支付处理失败");
}
}

舱壁隔离

resilience4j:
bulkhead:
configs:
default:
maxConcurrentCalls: 10
maxWaitDuration: 0
instances:
userService:
maxConcurrentCalls: 20
maxWaitDuration: 500ms
@Service
public class UserService {

@Bulkhead(name = "userService", fallbackMethod = "getUserFallback")
public User getUser(Long id) {
return userRepository.findById(id);
}

private User getUserFallback(Long id, Exception ex) {
return new User(id, "舱壁隔离降级");
}
}

组合使用

@Service
public class OrderService {

@CircuitBreaker(name = "orderService", fallbackMethod = "fallback")
@RateLimiter(name = "orderService")
@Retry(name = "orderService")
@Bulkhead(name = "orderService")
@TimeLimiter(name = "orderService")
public CompletableFuture<Order> createOrder(OrderRequest request) {
return CompletableFuture.supplyAsync(() -> {
return orderClient.create(request);
});
}

private CompletableFuture<Order> fallback(OrderRequest request, Exception ex) {
return CompletableFuture.completedFuture(
new Order(null, "服务降级订单")
);
}
}

最佳实践

1. 合理设置阈值

# 根据服务重要性设置不同阈值
resilience4j:
circuitbreaker:
instances:
# 核心服务:更宽松的阈值
paymentService:
failureRateThreshold: 30
waitDurationInOpenState: 30s

# 非核心服务:更严格的阈值
recommendService:
failureRateThreshold: 60
waitDurationInOpenState: 5s

2. 区分降级策略

// 核心服务:返回缓存数据或默认值
public Order getOrderFallback(Long id, Exception ex) {
return cacheService.getOrder(id);
}

// 非核心服务:返回空数据
public List<Recommend> getRecommendsFallback(Long userId, Exception ex) {
return Collections.emptyList();
}

3. 监控告警

@Component
public class CircuitBreakerEventListener {

@EventListener
public void onCircuitBreakerEvent(CircuitBreakerOnStateTransitionEvent event) {
log.warn("断路器状态变更: {} -> {}",
event.getStateTransition().getFromState(),
event.getStateTransition().getToState());

// 发送告警
alertService.sendAlert("断路器状态变更: " + event.getCircuitBreakerName());
}
}

4. 日志记录

@Aspect
@Component
public class CircuitBreakerLogAspect {

@Around("@annotation(circuitBreaker)")
public Object logCircuitBreaker(ProceedingJoinPoint joinPoint,
CircuitBreaker circuitBreaker) throws Throwable {
String name = circuitBreaker.name();
long start = System.currentTimeMillis();

try {
Object result = joinPoint.proceed();
log.info("断路器 {} 调用成功,耗时: {}ms",
name, System.currentTimeMillis() - start);
return result;
} catch (Exception e) {
log.error("断路器 {} 调用失败: {}", name, e.getMessage());
throw e;
}
}
}

小结

本章我们学习了:

  1. 断路器原理:防止雪崩效应的核心模式
  2. Sentinel:阿里巴巴的流量控制和熔断降级组件
  3. Resilience4j:轻量级容错库
  4. 最佳实践:合理设置阈值、区分降级策略、监控告警

参考资源