断路器与容错
在微服务架构中,服务之间的依赖关系复杂,一个服务的故障可能引发连锁反应,导致整个系统崩溃。断路器模式是解决这一问题的核心方案。
为什么需要断路器?
雪崩效应
当服务 A 调用服务 B,服务 B 调用服务 C 时,如果服务 C 出现故障:
┌─────────────────────────────────────────────────────────────┐
│ 雪崩效应示意 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 服务 A ──> 服务 B ──> 服务 C (故障) │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ 响应超时 │
│ │ │ │ │
│ │ ▼ │ │
│ │ 线程阻塞等待 │ │
│ │ │ │ │
│ ▼ │ │ │
│ 线程池耗尽 ◄───────────┘ │
│ │ │
│ ▼ │
│ 服务 A 不可用 │
│ │
└─────────────────────────────────────────────────────────────┘
断路器模式
断路器模式通过监控服务调用的成功率和响应时间,在服务出现故障时快速失败,防止故障蔓延。
断路器状态机:
┌──────────────────────────────┐
│ │
▼ │
┌─────────┐ 失败率超过阈值 ┌─────────┐
│ 关闭 │ ─────────────────> │ 打开 │
│ (Closed)│ │ (Open) │
└─────────┘ └────┬────┘
▲ │
│ │ 超时后
│ │ 进入半开
│ ▼
│ ┌─────────┐
│ 成功率恢复 │ 半开 │
└──────────────────────── │(Half-Open)│
└─────────┘
│
│ 失败
│ 返回打开
▼
┌─────────┐
│ 打开 │
└─────────┘
状态说明:
| 状态 | 说明 |
|---|---|
| 关闭 (Closed) | 正常状态,请求正常通过 |
| 打开 (Open) | 熔断状态,请求直接返回错误 |
| 半开 (Half-Open) | 探测状态,允许部分请求通过测试服务是否恢复 |
Sentinel
Sentinel 是阿里巴巴开源的流量控制和熔断降级组件,功能强大且易于使用。
Sentinel 核心概念
┌─────────────────────────────────────────────────────────────┐
│ Sentinel 核心功能 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 流量控制 │ │ 熔断降级 │ │ 系统保护 │ │
│ │ │ │ │ │ │ │
│ │ QPS 限制 │ │ 慢调用比例 │ │ CPU 使用率 │ │
│ │ 并发线程数 │ │ 异常比例 │ │ 系统负载 │ │
│ │ 冷启动 │ │ 异常数 │ │ 入口 QPS │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 热点参数 │ │ 来源访问 │ │
│ │ 限流 │ │ 控制 │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
快速开始
添加依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
配置:
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080
port: 8719
eager: true
management:
endpoints:
web:
exposure:
include: '*'
启动 Sentinel Dashboard:
java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar
访问 http://localhost:8080(默认用户名密码:sentinel/sentinel)
流量控制
QPS 限流:
@RestController
public class UserController {
@GetMapping("/user/{id}")
@SentinelResource(value = "getUser", blockHandler = "getUserBlockHandler")
public User getUser(@PathVariable Long id) {
return userService.getById(id);
}
// 限流处理方法
public User getUserBlockHandler(Long id, BlockException ex) {
User user = new User();
user.setId(id);
user.setName("限流降级用户");
return user;
}
}
在 Sentinel Dashboard 中配置流控规则:
- 资源名:getUser
- 阈值类型:QPS
- 单机阈值:10
并发线程数限流:
@GetMapping("/order/{id}")
@SentinelResource(value = "getOrder", blockHandler = "getOrderBlockHandler")
public Order getOrder(@PathVariable Long id) {
return orderService.getById(id);
}
配置:
- 资源名:getOrder
- 阈值类型:并发线程数
- 单机阈值:5
熔断降级
慢调用比例熔断:
@GetMapping("/product/{id}")
@SentinelResource(value = "getProduct", fallback = "getProductFallback")
public Product getProduct(@PathVariable Long id) {
return productService.getById(id);
}
public Product getProductFallback(Long id, Throwable ex) {
Product product = new Product();
product.setId(id);
product.setName("熔断降级商品");
return product;
}
熔断规则配置:
| 参数 | 说明 |
|---|---|
| 资源名 | getProduct |
| 熔断策略 | 慢调用比例 |
| 最大 RT | 500ms |
| 比例阈值 | 0.5 |
| 熔断时长 | 10s |
| 最小请求数 | 5 |
异常比例熔断:
当异常比例超过阈值时触发熔断。
异常数熔断:
当异常数超过阈值时触发熔断。
热点参数限流
针对请求参数进行限流:
@GetMapping("/user/info")
@SentinelResource(value = "getUserInfo", blockHandler = "getUserInfoBlockHandler")
public User getUserInfo(@RequestParam Long id) {
return userService.getById(id);
}
public User getUserInfoBlockHandler(Long id, BlockException ex) {
return new User(id, "热点参数限流");
}
配置热点规则:
@Configuration
public class SentinelConfig {
@PostConstruct
public void init() {
// 热点参数规则
ParamFlowRule rule = new ParamFlowRule();
rule.setResource("getUserInfo");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setCount(5); // 每秒最多5次
rule.setParamIdx(0); // 第一个参数
// 特定参数值例外
ParamFlowItem item = new ParamFlowItem();
item.setObject("1001"); // 用户ID为1001
item.setClassType(Long.class.getName());
item.setCount(100); // 每秒最多100次
rule.setParamFlowItemList(Collections.singletonList(item));
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
}
}
系统自适应保护
根据系统整体负载进行保护:
@Configuration
public class SystemRuleConfig {
@PostConstruct
public void init() {
List<SystemRule> rules = new ArrayList<>();
// CPU 使用率保护
SystemRule cpuRule = new SystemRule();
cpuRule.setHighestCpuUsage(0.8); // CPU 使用率超过 80%
rules.add(cpuRule);
// 系统负载保护
SystemRule loadRule = new SystemRule();
loadRule.setHighestSystemLoad(10); // 系统负载超过 10
rules.add(loadRule);
// 入口 QPS 保护
SystemRule qpsRule = new SystemRule();
qpsRule.setQps(1000); // 入口总 QPS 超过 1000
rules.add(qpsRule);
SystemRuleManager.loadRules(rules);
}
}
Sentinel 控制台集成
持久化配置到 Nacos:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
spring:
cloud:
sentinel:
datasource:
flow:
nacos:
server-addr: localhost:8848
data-id: ${spring.application.name}-flow-rules
group-id: SENTINEL_GROUP
rule-type: flow
degrade:
nacos:
server-addr: localhost:8848
data-id: ${spring.application.name}-degrade-rules
group-id: SENTINEL_GROUP
rule-type: degrade
Resilience4j
Resilience4j 是一个轻量级的容错库,是 Hystrix 的替代方案。
核心模块
| 模块 | 说明 |
|---|---|
| CircuitBreaker | 断路器 |
| RateLimiter | 限流器 |
| Retry | 重试 |
| TimeLimiter | 超时限制 |
| Bulkhead | 舱壁隔离 |
| Cache | 缓存 |
添加依赖
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-feign</artifactId>
</dependency>
断路器配置
resilience4j:
circuitbreaker:
configs:
default:
slidingWindowType: COUNT_BASED
slidingWindowSize: 10
failureRateThreshold: 50
waitDurationInOpenState: 10s
permittedNumberOfCallsInHalfOpenState: 3
slowCallDurationThreshold: 2s
slowCallRateThreshold: 50
instances:
userService:
baseConfig: default
orderService:
slidingWindowSize: 20
failureRateThreshold: 60
使用断路器
@Service
public class OrderService {
@Autowired
private UserClient userClient;
@CircuitBreaker(name = "userService", fallbackMethod = "getUserFallback")
public User getUser(Long userId) {
return userClient.getUser(userId);
}
private User getUserFallback(Long userId, Exception ex) {
log.warn("获取用户信息失败,使用降级数据: {}", ex.getMessage());
User user = new User();
user.setId(userId);
user.setName("降级用户");
return user;
}
}
限流器配置
resilience4j:
ratelimiter:
configs:
default:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 0
instances:
orderApi:
limitForPeriod: 100
limitRefreshPeriod: 1s
使用限流器
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@RateLimiter(name = "orderApi", fallbackMethod = "createOrderFallback")
@PostMapping("/order")
public Order createOrder(@RequestBody CreateOrderRequest request) {
return orderService.create(request);
}
private Order createOrderFallback(CreateOrderRequest request, Exception ex) {
throw new BusinessException("系统繁忙,请稍后重试");
}
}
重试配置
resilience4j:
retry:
configs:
default:
maxAttempts: 3
waitDuration: 500ms
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- java.io.IOException
- java.net.SocketTimeoutException
instances:
userService:
maxAttempts: 5
waitDuration: 1s
使用重试
@Service
public class PaymentService {
@Retry(name = "paymentService", fallbackMethod = "processFallback")
public PaymentResult process(PaymentRequest request) {
return paymentClient.process(request);
}
private PaymentResult processFallback(PaymentRequest request, Exception ex) {
log.error("支付处理失败: {}", ex.getMessage());
return PaymentResult.fail("支付处理失败");
}
}
舱壁隔离
resilience4j:
bulkhead:
configs:
default:
maxConcurrentCalls: 10
maxWaitDuration: 0
instances:
userService:
maxConcurrentCalls: 20
maxWaitDuration: 500ms
@Service
public class UserService {
@Bulkhead(name = "userService", fallbackMethod = "getUserFallback")
public User getUser(Long id) {
return userRepository.findById(id);
}
private User getUserFallback(Long id, Exception ex) {
return new User(id, "舱壁隔离降级");
}
}
组合使用
@Service
public class OrderService {
@CircuitBreaker(name = "orderService", fallbackMethod = "fallback")
@RateLimiter(name = "orderService")
@Retry(name = "orderService")
@Bulkhead(name = "orderService")
@TimeLimiter(name = "orderService")
public CompletableFuture<Order> createOrder(OrderRequest request) {
return CompletableFuture.supplyAsync(() -> {
return orderClient.create(request);
});
}
private CompletableFuture<Order> fallback(OrderRequest request, Exception ex) {
return CompletableFuture.completedFuture(
new Order(null, "服务降级订单")
);
}
}
最佳实践
1. 合理设置阈值
# 根据服务重要性设置不同阈值
resilience4j:
circuitbreaker:
instances:
# 核心服务:更宽松的阈值
paymentService:
failureRateThreshold: 30
waitDurationInOpenState: 30s
# 非核心服务:更严格的阈值
recommendService:
failureRateThreshold: 60
waitDurationInOpenState: 5s
2. 区分降级策略
// 核心服务:返回缓存数据或默认值
public Order getOrderFallback(Long id, Exception ex) {
return cacheService.getOrder(id);
}
// 非核心服务:返回空数据
public List<Recommend> getRecommendsFallback(Long userId, Exception ex) {
return Collections.emptyList();
}
3. 监控告警
@Component
public class CircuitBreakerEventListener {
@EventListener
public void onCircuitBreakerEvent(CircuitBreakerOnStateTransitionEvent event) {
log.warn("断路器状态变更: {} -> {}",
event.getStateTransition().getFromState(),
event.getStateTransition().getToState());
// 发送告警
alertService.sendAlert("断路器状态变更: " + event.getCircuitBreakerName());
}
}
4. 日志记录
@Aspect
@Component
public class CircuitBreakerLogAspect {
@Around("@annotation(circuitBreaker)")
public Object logCircuitBreaker(ProceedingJoinPoint joinPoint,
CircuitBreaker circuitBreaker) throws Throwable {
String name = circuitBreaker.name();
long start = System.currentTimeMillis();
try {
Object result = joinPoint.proceed();
log.info("断路器 {} 调用成功,耗时: {}ms",
name, System.currentTimeMillis() - start);
return result;
} catch (Exception e) {
log.error("断路器 {} 调用失败: {}", name, e.getMessage());
throw e;
}
}
}
小结
本章我们学习了:
- 断路器原理:防止雪崩效应的核心模式
- Sentinel:阿里巴巴的流量控制和熔断降级组件
- Resilience4j:轻量级容错库
- 最佳实践:合理设置阈值、区分降级策略、监控告警