分布式消息 Spring Cloud Stream
在微服务架构中,服务之间的异步通信是一个重要的话题。Spring Cloud Stream 是一个用于构建高度可扩展的事件驱动微服务的框架,它提供了一套统一的编程模型来与消息中间件交互,让开发者可以专注于业务逻辑,而不必关心底层消息系统的细节。
为什么需要消息驱动?
同步调用的问题
在传统的同步调用模式下,服务之间存在强耦合:
同步调用场景:
┌─────────────┐ ┌─────────────┐
│ 订单服务 │ ───────→ │ 库存服务 │
└─────────────┘ 阻塞等待 └─────────────┘
问题:
1. 库存服务宕机,订单服务无法下单
2. 高峰期请求堆积,响应延迟增加
3. 服务间紧密耦合,难以独立扩展
异步消息的优势
异步消息场景:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 订单服务 │ ───────→ │ 消息队列 │ ───────→ │ 库存服务 │
└─────────────┘ 发送消息 └─────────────┘ 异步处理 └─────────────┘
优势:
1. 服务解耦:订单服务不依赖库存服务可用性
2. 削峰填谷:消息队列缓冲高峰请求
3. 可靠性:消息持久化保证不丢失
4. 灵活性:可以添加多个消费者处理同一消息
Spring Cloud Stream 核心概念
架构模型
Spring Cloud Stream 通过 Binder 抽象层屏蔽了不同消息中间件的差异。这种抽象让应用程序可以与消息中间件解耦,开发者只需要关注业务逻辑,而不用关心具体使用的是哪种消息系统。
┌─────────────────────────────────────────────────────────────┐
│ Spring Cloud Stream │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Application │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Input │ │ Output │ │ Processor │ │ │
│ │ │ Binding │ │ Binding │ │ Binding │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │
│ └──────────┼────────────────┼────────────────┼────────┘ │
│ │ │ │ │
│ ┌──────────▼────────────────▼────────────────▼────────┐ │
│ │ Binder │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ RabbitMQ│ │ Kafka │ │ 其他 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
核心组件
| 组件 | 说明 |
|---|---|
| Binder | 与外部消息系统集成的组件,如 Kafka Binder、RabbitMQ Binder。它负责与消息中间件通信,处理连接、消息发送和接收等底层操作。 |
| Binding | 应用程序与消息系统之间的桥梁,分为 Input(消费者)和 Output(生产者)。Binding 将业务逻辑与消息基础设施解耦。 |
| Message | 生产者和消费者之间通信的规范数据结构,包含消息头(Headers)和消息体(Payload)。 |
版本说明
Spring Cloud Stream 从 3.0 版本开始,全面采用函数式编程模型。在 4.0 版本中,基于注解的编程模型(如 @StreamListener)已被完全移除,只保留函数式编程模型。
Spring Cloud Stream 4.0 主要变化:
- 移除注解式编程模型,只支持函数式编程模型
- 新增 Reactive Kafka Binder(基于 Reactor Kafka)
- 重新引入 Schema Registry 支持
- 新增 Test Binder 用于测试
- 改进错误队列命名机制
函数式编程模型
Spring Cloud Stream 4.x 采用函数式编程模型,通过 java.util.function 包中的 Supplier、Function 和 Consumer 接口来定义消息处理逻辑。这种编程模型更简洁,也更容易测试。
Consumer:消息消费者
Consumer<T> 用于接收和处理消息,不返回任何结果。
@SpringBootApplication
public class OrderConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(OrderConsumerApplication.class, args);
}
/**
* 消费者:处理订单消息
*
* 方法名 handleOrder 会自动绑定到 handleOrder-in-0
* -in-0 表示这是输入绑定(消费者)
*/
@Bean
public Consumer<Order> handleOrder() {
return order -> {
log.info("收到订单: {}", order);
// 业务处理逻辑
processOrder(order);
};
}
}
Supplier:消息生产者
Supplier<T> 用于生成消息。与 Consumer 和 Function 不同,Supplier 是数据的源头,它不订阅任何输入目标,而是通过轮询机制触发。
@SpringBootApplication
public class OrderProducerApplication {
public static void main(String[] args) {
SpringApplication.run(OrderProducerApplication.class, args);
}
/**
* 生产者:定时发送订单消息
*
* 默认情况下,Supplier 每秒被调用一次
* 可以通过配置调整轮询间隔
*/
@Bean
public Supplier<Order> generateOrder() {
return () -> {
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setProduct("商品-" + RandomStringUtils.randomAlphabetic(5));
order.setCreateTime(LocalDateTime.now());
return order;
};
}
}
轮询配置:
spring:
integration:
poller:
fixed-delay: 2000 # 轮询间隔(毫秒),默认 1000
max-messages-per-poll: 1 # 每次轮询最大消息数,默认 1
# cron: "0 * * * * *" # 也可以使用 cron 表达式
Function:消息处理器
Function<T, R> 接收消息,处理后返回新消息。它同时具有消费者和生产者的角色。
@SpringBootApplication
public class OrderProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(OrderProcessorApplication.class, args);
}
/**
* 处理器:接收订单,处理后发送到另一个目的地
*
* 输入绑定:processOrder-in-0
* 输出绑定:processOrder-out-0
*/
@Bean
public Function<Order, ProcessedOrder> processOrder() {
return order -> {
ProcessedOrder processed = new ProcessedOrder();
processed.setOrderId(order.getId());
processed.setStatus("PROCESSED");
processed.setProcessTime(LocalDateTime.now());
processed.setTotalAmount(calculateTotal(order));
return processed;
};
}
}
绑定命名规则
Spring Cloud Stream 使用约定的命名规则自动创建绑定名称:
| 函数类型 | 绑定名称模式 | 示例 |
|---|---|---|
| Consumer | {functionName}-in-{index} | handleOrder-in-0 |
| Supplier | {functionName}-out-{index} | generateOrder-out-0 |
| Function | {functionName}-in-{index} 和 {functionName}-out-{index} | processOrder-in-0、processOrder-out-0 |
索引从 0 开始,用于支持多输入多输出的函数。
配置绑定
spring:
cloud:
stream:
# 定义要激活的函数
function:
definition: handleOrder;generateOrder
# 绑定配置
bindings:
# 消费者配置
handleOrder-in-0:
destination: orders # 目的地(队列/主题名)
group: order-consumers # 消费者组
consumer:
max-attempts: 3 # 最大重试次数
# 生产者配置
generateOrder-out-0:
destination: orders
producer:
partition-key-expression: headers['orderId']
StreamBridge:灵活的消息发送
StreamBridge 提供了一种灵活的方式来发送消息,特别适合以下场景:
- 从非 Stream 源(如 REST API)发送消息
- 动态决定发送目标
- 在任意位置发送消息
基本用法
@Service
public class OrderService {
@Autowired
private StreamBridge streamBridge;
public String createOrder(CreateOrderRequest request) {
// 创建订单
Order order = new Order();
order.setId(UUID.randomUUID().toString());
// ... 设置其他属性
// 发送订单消息到指定的 binding
streamBridge.send("orderCreated-out-0", order);
return order.getId();
}
}
动态目的地
StreamBridge 支持动态目的地,即在运行时决定消息发送到哪个目标:
@RestController
public class MessageController {
@Autowired
private StreamBridge streamBridge;
@PostMapping("/send")
public void sendMessage(@RequestParam String destination, @RequestBody String message) {
// 动态发送到指定目的地
// 如果目的地不存在,会自动创建
streamBridge.send(destination, message);
}
}
预创建输出绑定
默认情况下,StreamBridge 会在首次使用时创建绑定。如果希望在启动时就创建绑定,可以使用 spring.cloud.stream.output-bindings 属性:
spring:
cloud:
stream:
output-bindings: orderCreated;orderUpdated;orderDeleted
异步发送
StreamBridge 默认使用同步发送(阻塞当前线程)。如果需要异步发送:
@Service
public class AsyncMessageService {
@Autowired
private StreamBridge streamBridge;
@PostConstruct
public void init() {
// 启用异步发送模式
streamBridge.setAsync(true);
}
public void sendAsync(Order order) {
// 异步发送,不阻塞当前线程
streamBridge.send("orders-out-0", order);
}
}
使用异步发送时,如果需要保证追踪上下文的正确传播,需要添加 Micrometer 的 context-propagation 依赖。
多 Binder 支持
当应用程序配置了多个 Binder 时,需要指定使用哪个 Binder:
spring:
cloud:
stream:
binders:
rabbit:
type: rabbit
environment:
spring.rabbitmq.host: localhost
kafka:
type: kafka
environment:
spring.kafka.bootstrap-servers: localhost:9092
// 指定使用 RabbitMQ Binder
streamBridge.send("orders-out-0", "rabbit", order);
// 指定使用 Kafka Binder
streamBridge.send("orders-out-0", "kafka", order);
Reactive 响应式编程支持
Spring Cloud Stream 基于 Spring Cloud Function 和 Project Reactor 构建,天然支持响应式编程。使用响应式 API 可以更优雅地处理流式数据。
响应式 Consumer
@Bean
public Consumer<Flux<Order>> handleOrders() {
return flux -> flux
.filter(order -> order.getAmount() > 100) // 过滤
.map(this::processOrder) // 转换
.doOnNext(order -> log.info("处理订单: {}", order))
.subscribe(); // 必须订阅
}
响应式 Function
响应式 Function 是更推荐的方式,因为它返回一个可以被框架订阅的 Mono:
@Bean
public Function<Flux<Order>, Mono<Void>> processOrders() {
return flux -> flux
.filter(order -> order.getAmount() > 100)
.map(this::enrichOrder)
.flatMap(this::saveOrder)
.then(); // 返回 Mono<Void>
}
响应式 Supplier
响应式 Supplier 有两种模式:
持续流模式(只触发一次):
@Bean
public Supplier<Flux<Order>> continuousOrderStream() {
return () -> Flux.interval(Duration.ofSeconds(1))
.map(i -> generateOrder(i));
// 这个 Supplier 只会被调用一次,持续产生消息
}
轮询模式(定期触发):
@PollableBean // 标记为可轮询
public Supplier<Flux<Order>> batchOrderSupplier() {
return () -> {
// 返回有限流,每次轮询产生一批订单
return Flux.fromIterable(fetchPendingOrders());
};
}
响应式编程注意事项
使用响应式 API 时需要注意以下几点:
-
背压支持:只有使用 Reactive Kafka Binder 时才能完全利用响应式的背压特性。使用普通 Kafka 或 RabbitMQ Binder 时,响应式 API 的便利性可以使用,但无法享受完整的响应式特性。
-
错误处理:框架提供的错误处理、重试等机制只对命令式函数生效。响应式函数需要在流内部自行处理错误。
-
Consumer 订阅:响应式
Consumer<Flux<?>>需要手动订阅,否则消息不会被处理。推荐使用Function<Flux<?>, Mono<Void>>替代。
消费者组与分区
消费者组
消费者组是 Spring Cloud Stream 中实现消息负载均衡和高可用的重要机制。同一组内的消费者共享订阅,每条消息只会被组内的一个消费者处理。
┌─────────────┐
│ 消息队列 │
└──────┬──────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│消费者1 │ │消费者2 │ │消费者3 │
│(组A) │ │(组A) │ │(组B) │
└─────────┘ └─────────┘ └─────────┘
消费者组A:每条消息只被组内一个消费者处理(负载均衡)
消费者组B:独立消费所有消息(广播模式)
配置消费者组:
spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
group: order-consumers # 指定消费者组
消费者组的意义:
- 负载均衡:消息在组内消费者之间分配,提高处理吞吐量
- 高可用:一个消费者宕机,其他消费者自动接管其分区
- 消息顺序:同一分区的消息按顺序被组内一个消费者处理
消息分区
分区确保具有相同特征的消息被发送到同一个分区,从而保证消息的顺序性。
订单消息按用户ID分区:
分区0: [用户A订单1, 用户A订单2, 用户D订单1]
分区1: [用户B订单1, 用户B订单2, 用户E订单1]
分区2: [用户C订单1, 用户C订单2, 用户F订单1]
同一用户的订单始终在同一个分区,保证顺序性
生产者分区配置:
spring:
cloud:
stream:
bindings:
orderCreated-out-0:
destination: orders
producer:
# 分区键表达式,决定消息发送到哪个分区
partition-key-expression: headers['userId']
# 分区数量
partition-count: 3
消费者分区配置:
spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
group: order-consumers
consumer:
partitioned: true # 启用分区消费
# 消费者实例配置
instance-count: 3 # 总实例数
instance-index: 0 # 当前实例索引(0、1、2)
函数组合
Spring Cloud Stream 支持函数组合,可以将多个简单的函数组合成一个复杂的处理链。这有助于将复杂逻辑分解为可测试、可复用的组件。
基本组合
使用 | 符号组合函数:
spring:
cloud:
function:
definition: validate|process|enrich
@SpringBootApplication
public class ComposedFunctionApplication {
// 第一步:验证
@Bean
public Function<Order, Order> validate() {
return order -> {
if (order.getItems() == null || order.getItems().isEmpty()) {
throw new IllegalArgumentException("订单项不能为空");
}
return order;
};
}
// 第二步:处理
@Bean
public Function<Order, Order> process() {
return order -> {
order.setTotalAmount(calculateTotal(order));
return order;
};
}
// 第三步:丰富
@Bean
public Function<Order, EnrichedOrder> enrich() {
return order -> {
EnrichedOrder enriched = new EnrichedOrder();
BeanUtils.copyProperties(order, enriched);
enriched.setProcessTime(LocalDateTime.now());
enriched.setProcessId(UUID.randomUUID().toString());
return enriched;
};
}
}
简化绑定名称
组合后的函数名称会很长,可以使用 spring.cloud.stream.function.bindings 属性简化:
spring:
cloud:
function:
definition: validate|process|enrich
stream:
function:
bindings:
validate|process|enrich-in-0: orderInput
validate|process|enrich-out-0: orderOutput
bindings:
orderInput:
destination: raw-orders
orderOutput:
destination: processed-orders
横切关注点
函数组合特别适合处理横切关注点,如消息验证、日志记录、消息丰富等:
// 添加追踪信息
@Bean
public Function<Message<Order>, Message<Order>> addTraceInfo() {
return message -> MessageBuilder.fromMessage(message)
.setHeader("traceId", UUID.randomUUID().toString())
.setHeader("timestamp", System.currentTimeMillis())
.build();
}
// 业务处理
@Bean
public Function<Order, Order> businessProcess() {
return order -> {
// 纯粹的业务逻辑,不关心追踪等非功能性需求
return order;
};
}
多输入多输出函数
某些场景下,一个函数需要从多个输入源接收数据,或者输出到多个目标。Spring Cloud Stream 支持使用 Reactor 的 Tuple 类型来实现这种模式。
多输入函数
@Bean
public Function<Tuple2<Flux<Order>, Flux<Payment>>, Flux<CompletedOrder>> aggregate() {
return tuple -> {
Flux<Order> orders = tuple.getT1();
Flux<Payment> payments = tuple.getT2();
// 订单和支付流水合并
return orders.join(
payments,
Duration.ofSeconds(30), // 关联时间窗口
order -> Mono.just(order.getId()),
payment -> Mono.just(payment.getOrderId()),
(order, payment) -> new CompletedOrder(order, payment)
);
};
}
绑定配置:
spring:
cloud:
stream:
bindings:
aggregate-in-0: # 第一个输入(Order)
destination: orders
aggregate-in-1: # 第二个输入(Payment)
destination: payments
aggregate-out-0: # 输出
destination: completed-orders
多输出函数
@Bean
public Function<Order, Tuple2<Invoice, Shipping>> splitOrder() {
return order -> Tuples.of(
createInvoice(order),
createShipping(order)
);
}
绑定配置:
spring:
cloud:
stream:
bindings:
splitOrder-in-0:
destination: orders
splitOrder-out-0: # 第一个输出(Invoice)
destination: invoices
splitOrder-out-1: # 第二个输出(Shipping)
destination: shippings
错误处理
重试机制
Spring Cloud Stream 提供了内置的重试机制,当消息处理失败时可以自动重试:
spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
consumer:
max-attempts: 3 # 最大重试次数,默认 3
back-off-initial-interval: 1000 # 初始重试间隔(毫秒)
back-off-multiplier: 2.0 # 重试间隔倍数
back-off-max-interval: 10000 # 最大重试间隔(毫秒)
重试间隔计算:interval = initialInterval * multiplier^(retryCount - 1)
死信队列
当消息重试次数耗尽后,会被发送到死信队列(DLQ):
spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
group: order-consumers
rabbit:
bindings:
handleOrder-in-0:
consumer:
auto-bind-dlq: true # 自动创建死信队列
dlq-queue-name: orders.dlq # 死信队列名称
republish-to-dlq: true # 重新发布到死信队列
死信队列名称格式:{destination}.{group}.dlq
自定义错误处理
@Bean
public Consumer<Order> handleOrder() {
return order -> {
try {
processOrder(order);
} catch (BusinessException e) {
// 业务异常,记录日志,不重试
log.error("订单处理失败: {}", order.getId(), e);
throw new ImmediateRetryException("不重试");
} catch (TechnicalException e) {
// 技术异常,抛出以触发重试
throw new RuntimeException(e);
}
};
}
// 死信队列消费者
@Bean
public Consumer<Order> handleOrderDlq() {
return failedOrder -> {
// 处理失败的消息
log.error("订单最终处理失败: {}", failedOrder);
// 发送告警或人工处理
};
}
错误通道
可以订阅错误通道来统一处理错误:
@Service
public class ErrorHandler {
@ServiceActivator(inputChannel = "orders.errors")
public void handleError(ErrorMessage errorMessage) {
log.error("消息处理错误: ", errorMessage.getPayload());
// 可以访问原始消息和异常信息
Message<?> failedMessage = errorMessage.getOriginalMessage();
Throwable cause = errorMessage.getPayload();
}
}
消息头与内容类型
消息头操作
Spring Cloud Stream 使用 Spring Messaging 的 Message 类型来访问和操作消息头:
@Bean
public Function<Message<Order>, Message<Order>> processWithHeaders() {
return message -> {
// 读取消息头
String traceId = (String) message.getHeaders().get("traceId");
String contentType = message.getHeaders().getContentType().toString();
Order order = message.getPayload();
// 创建新消息,添加或修改消息头
return MessageBuilder
.withPayload(order)
.setHeader("processedAt", System.currentTimeMillis())
.setHeader("processor", "order-service")
.build();
};
}
内容类型协商
Spring Cloud Stream 支持自动的内容类型转换。默认支持 JSON、文本、二进制等格式:
spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
content-type: application/json # 指定内容类型
常用内容类型:
application/json:JSON 格式(默认)application/xml:XML 格式text/plain:纯文本application/octet-stream:二进制流
自定义序列化
@Configuration
public class SerializationConfig {
@Bean
public MessageConverter avroMessageConverter() {
return new MessageConverter() {
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
// 自定义序列化逻辑
byte[] data = serializeToAvro(payload);
return MessageBuilder.withPayload(data)
.setContentType(MimeTypeUtils.parseMimeType("application/avro"))
.build();
}
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
// 自定义反序列化逻辑
return deserializeFromAvro((byte[]) message.getPayload(), targetClass);
}
};
}
}
Schema Registry 支持
Spring Cloud Stream 4.0 重新引入了 Schema Registry 支持,特别适合需要模式演进的场景,如使用 Avro 进行序列化。
Schema Registry Server
添加依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema-registry-server</artifactId>
</dependency>
启动类:
@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}
默认使用 H2 内存数据库,可以切换到 PostgreSQL 或 MySQL:
spring:
datasource:
url: jdbc:postgresql://localhost:5432/schema_registry
username: postgres
password: postgres
jpa:
hibernate:
ddl-auto: update
Schema Registry Client
添加依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema-registry-client</artifactId>
</dependency>
配置:
spring:
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8990
启用 Schema Registry Client:
@SpringBootApplication
@EnableSchemaRegistryClient
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
测试支持
Spring Cloud Stream 4.0 提供了 Test Binder,方便进行单元测试和集成测试。
测试依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
单元测试示例
@SpringBootTest
public class OrderProcessorTest {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
@Test
void testProcessOrder() {
// 准备测试数据
Order order = new Order();
order.setId("order-001");
order.setAmount(new BigDecimal("100.00"));
// 发送消息到输入绑定
input.send(new GenericMessage<>(order), "processOrder-in-0");
// 从输出绑定接收消息
Message<byte[]> received = output.receive(1000, "processOrder-out-0");
// 验证结果
assertThat(received).isNotNull();
ProcessedOrder processed = new ObjectMapper()
.readValue(received.getPayload(), ProcessedOrder.class);
assertThat(processed.getOrderId()).isEqualTo("order-001");
}
}
使用 TestBinder
@SpringBootTest(properties = {
"spring.cloud.function.definition=handleOrder"
})
public class ConsumerTest {
@Autowired
private InputDestination input;
@Test
void testConsumer() {
Order order = new Order("order-001", "product-001", 10);
// 发送消息
input.send(MessageBuilder.withPayload(order).build());
// 由于 Consumer 没有输出,可以通过 mock 或验证副作用来测试
// 例如验证数据库记录、日志输出等
}
}
RabbitMQ Binder 详解
RabbitMQ 特有配置
spring:
cloud:
stream:
rabbit:
bindings:
handleOrder-in-0:
consumer:
# 预取数量,控制消费速率
prefetch: 10
# 确认模式:AUTO、MANUAL、NONE
acknowledge-mode: auto
# 是否将拒绝的消息重新入队
requeue-rejected: false
# 死信队列配置
auto-bind-dlq: true
dlq-dead-letter-exchange: orders.dlx
dlq-dead-letter-routing-key: orders.failed
# 消费者标签
consumer-tag-prefix: order-consumer
orderCreated-out-0:
producer:
# 消息持久化
delivery-mode: persistent
# 消息过期时间(毫秒)
ttl: 60000
# 优先级
priority: 10
交换机和队列配置
spring:
cloud:
stream:
rabbit:
bindings:
handleOrder-in-0:
consumer:
# 队列配置
queue-name-group-only: false
binding-routing-key: "#"
# 交换机配置
exchange-type: topic
delayed-exchange: false
Kafka Binder 详解
Kafka 特有配置
spring:
cloud:
stream:
kafka:
bindings:
handleOrder-in-0:
consumer:
# 自动提交偏移量
ack-mode: manual
# 从最早的偏移量开始
start-offset: earliest
# 是否自动重置偏移量
reset-offsets: false
# 批量消费模式
batch-mode: false
orderCreated-out-0:
producer:
# 分区数量
partition-count: 3
# 副本数量
replication-factor: 2
# 消息键表达式
message-key-expression: headers['orderId']
# 压缩类型
compression-type: gzip
Kafka 事务支持
spring:
cloud:
stream:
kafka:
binder:
# 事务配置
transaction:
transaction-id-prefix: tx-order-
# 生产者配置
producer-properties:
enable.idempotence: true
acks: all
Reactive Kafka Binder
Spring Cloud Stream 4.0 提供了专门的 Reactive Kafka Binder,支持完整的响应式编程模型:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>
spring:
cloud:
stream:
kafka:
binder:
type: reactive
configuration:
bootstrap.servers: localhost:9092
2025年5月,Spring 团队宣布 Reactor Kafka 项目将停止维护。这意味着:
- Reactor Kafka 将从未来的 Reactor BOM 中移除
- Reactor Kafka 1.3 将是最后一个次要版本
- Spring Cloud Stream Reactive Kafka Binder 已被弃用,将在未来版本中移除
迁移建议:
对于新项目,建议使用标准的 Kafka Binder 配合响应式函数签名:
// 推荐方式:使用标准 Kafka Binder + 响应式函数
@Bean
public Function<Flux<Order>, Mono<Void>> processOrders() {
return flux -> flux
.map(this::processOrder)
.then();
}
对于现有使用 Reactive Kafka Binder 的项目,建议规划迁移到标准 Kafka Binder。虽然响应式 API 的便利性可以使用,但标准 Kafka Binder 无法提供完整的端到端响应式特性(如背压)。
详见 Spring 官方公告。
实战示例:订单处理系统
场景描述
模拟一个完整的订单处理流程:
- 订单服务发送订单消息
- 库存服务消费订单消息,扣减库存
- 支付服务消费订单消息,处理支付
- 通知服务消费处理结果,发送通知
公共消息模型
@Data
public class OrderMessage {
private String orderId;
private String userId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private OrderStatus status;
private LocalDateTime createTime;
}
@Data
public class OrderItem {
private String productId;
private String productName;
private Integer quantity;
private BigDecimal price;
}
public enum OrderStatus {
CREATED, // 已创建
PAID, // 已支付
INVENTORY_OK, // 库存检查通过
COMPLETED, // 已完成
CANCELLED // 已取消
}
订单服务(生产者)
@Service
public class OrderService {
@Autowired
private StreamBridge streamBridge;
public String createOrder(CreateOrderRequest request) {
// 创建订单
OrderMessage order = new OrderMessage();
order.setOrderId(UUID.randomUUID().toString());
order.setUserId(request.getUserId());
order.setItems(request.getItems());
order.setTotalAmount(calculateTotal(request.getItems()));
order.setStatus(OrderStatus.CREATED);
order.setCreateTime(LocalDateTime.now());
// 发送订单消息
streamBridge.send("orderCreated-out-0",
MessageBuilder.withPayload(order)
.setHeader("orderId", order.getOrderId())
.build());
return order.getOrderId();
}
}
库存服务(消费者)
@Service
@Slf4j
public class InventoryService {
@Autowired
private StreamBridge streamBridge;
@Bean
public Consumer<OrderMessage> handleOrderCreated() {
return order -> {
log.info("处理订单库存: {}", order.getOrderId());
try {
// 检查并扣减库存
for (OrderItem item : order.getItems()) {
boolean success = deductStock(item.getProductId(), item.getQuantity());
if (!success) {
throw new InsufficientStockException(item.getProductId());
}
}
// 发送库存扣减成功消息
OrderMessage updated = new OrderMessage();
BeanUtils.copyProperties(order, updated);
updated.setStatus(OrderStatus.INVENTORY_OK);
streamBridge.send("inventoryOk-out-0", updated);
} catch (Exception e) {
log.error("库存处理失败: {}", order.getOrderId(), e);
// 抛出异常触发重试或进入死信队列
throw new RuntimeException(e);
}
};
}
private boolean deductStock(String productId, int quantity) {
// 实际库存扣减逻辑
return true;
}
}
支付服务(消费者)
@Service
@Slf4j
public class PaymentService {
@Autowired
private StreamBridge streamBridge;
@Bean
public Consumer<OrderMessage> handleOrderCreated() {
return order -> {
log.info("处理订单支付: {}", order.getOrderId());
try {
// 调用支付接口
PaymentResult result = processPayment(order);
if (result.isSuccess()) {
OrderMessage paid = new OrderMessage();
BeanUtils.copyProperties(order, paid);
paid.setStatus(OrderStatus.PAID);
streamBridge.send("paymentOk-out-0", paid);
} else {
// 支付失败,发送失败消息
handlePaymentFailure(order, result);
}
} catch (Exception e) {
log.error("支付处理失败: {}", order.getOrderId(), e);
throw new RuntimeException(e);
}
};
}
private PaymentResult processPayment(OrderMessage order) {
// 实际支付逻辑
return new PaymentResult(true, "success");
}
}
通知服务(消费者)
@Service
@Slf4j
public class NotificationService {
@Bean
public Consumer<OrderMessage> handleOrderCompleted() {
return order -> {
log.info("发送订单通知: {}", order.getOrderId());
// 发送通知
String message = buildNotificationMessage(order);
sendNotification(order.getUserId(), message);
};
}
private String buildNotificationMessage(OrderMessage order) {
return String.format("您的订单 %s 已处理完成,状态:%s",
order.getOrderId(), order.getStatus());
}
private void sendNotification(String userId, String message) {
// 实际通知发送逻辑(邮件、短信、推送等)
}
}
最佳实践
1. 消息幂等性
消息可能被重复投递,消费者需要保证幂等性:
@Bean
public Consumer<OrderMessage> handleOrder() {
return order -> {
// 使用业务唯一键检查是否已处理
if (isProcessed(order.getOrderId())) {
log.info("订单已处理,跳过: {}", order.getOrderId());
return;
}
// 处理订单
processOrder(order);
// 标记为已处理
markAsProcessed(order.getOrderId());
};
}
2. 消息顺序性
使用分区保证消息的顺序性:
spring:
cloud:
stream:
bindings:
orderCreated-out-0:
producer:
# 使用订单ID作为分区键,同一订单的消息始终在同一分区
partition-key-expression: headers['orderId']
partition-count: 3
3. 合理配置重试
根据业务场景配置重试策略:
- 瞬时故障:配置多次重试,指数退避
- 业务异常:不重试,直接进入死信队列
- 资源限制:限制最大重试次数,避免无限重试
4. 监控与追踪
spring:
cloud:
stream:
bindings:
handleOrder-in-0:
consumer:
# 启用度量指标
enable-metrics: true
使用 Spring Cloud Sleuth 或 Micrometer Tracing 进行分布式追踪。
5. 合理拆分主题
- 按业务领域拆分主题
- 避免单个主题消息量过大
- 考虑消息的订阅关系
小结
本章我们学习了:
- 消息驱动架构:异步通信的优势和应用场景
- 核心概念:Binder、Binding、Message
- 函数式编程模型:Consumer、Supplier、Function
- StreamBridge:灵活的消息发送机制
- 响应式编程:Flux 和 Mono 的使用
- 消费者组与分区:实现负载均衡和消息顺序
- 函数组合:组合简单函数实现复杂逻辑
- 错误处理:重试机制和死信队列
- 测试支持:Test Binder 的使用
- Binder 配置:RabbitMQ 和 Kafka 的特定配置
版本说明
本教程基于 Spring Cloud Stream 4.x 编写,主要变化如下:
| 版本 | 变化 |
|---|---|
| 4.0 | 完全移除注解式编程模型(@StreamListener),只支持函数式编程模型;新增 Reactive Kafka Binder;重新引入 Schema Registry 支持;新增 Test Binder |
| 4.1+ | 持续优化和 Bug 修复 |
重要变更提醒:
- Reactive Kafka Binder 已弃用:由于 Reactor Kafka 项目停止维护,Spring Cloud Stream Reactive Kafka Binder 已被标记为弃用,将在未来版本中移除。新项目建议使用标准 Kafka Binder 配合响应式函数签名。
- 注解模型已移除:从 4.0 开始,
@StreamListener、@Input、@Output等注解不再可用,必须使用函数式编程模型