跳到主要内容

分布式消息 Spring Cloud Stream

在微服务架构中,服务之间的异步通信是一个重要的话题。Spring Cloud Stream 是一个用于构建高度可扩展的事件驱动微服务的框架,它提供了一套统一的编程模型来与消息中间件交互,让开发者可以专注于业务逻辑,而不必关心底层消息系统的细节。

为什么需要消息驱动?

同步调用的问题

在传统的同步调用模式下,服务之间存在强耦合:

同步调用场景:
┌─────────────┐ ┌─────────────┐
│ 订单服务 │ ───────→ │ 库存服务 │
└─────────────┘ 阻塞等待 └─────────────┘

问题:
1. 库存服务宕机,订单服务无法下单
2. 高峰期请求堆积,响应延迟增加
3. 服务间紧密耦合,难以独立扩展

异步消息的优势

异步消息场景:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 订单服务 │ ───────→ │ 消息队列 │ ───────→ │ 库存服务 │
└─────────────┘ 发送消息 └─────────────┘ 异步处理 └─────────────┘

优势:
1. 服务解耦:订单服务不依赖库存服务可用性
2. 削峰填谷:消息队列缓冲高峰请求
3. 可靠性:消息持久化保证不丢失
4. 灵活性:可以添加多个消费者处理同一消息

Spring Cloud Stream 核心概念

架构模型

Spring Cloud Stream 通过 Binder 抽象层屏蔽了不同消息中间件的差异。这种抽象让应用程序可以与消息中间件解耦,开发者只需要关注业务逻辑,而不用关心具体使用的是哪种消息系统。

┌─────────────────────────────────────────────────────────────┐
│ Spring Cloud Stream │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Application │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Input │ │ Output │ │ Processor │ │ │
│ │ │ Binding │ │ Binding │ │ Binding │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │
│ └──────────┼────────────────┼────────────────┼────────┘ │
│ │ │ │ │
│ ┌──────────▼────────────────▼────────────────▼────────┐ │
│ │ Binder │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ RabbitMQ│ │ Kafka │ │ 其他 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

核心组件

组件说明
Binder与外部消息系统集成的组件,如 Kafka Binder、RabbitMQ Binder。它负责与消息中间件通信,处理连接、消息发送和接收等底层操作。
Binding应用程序与消息系统之间的桥梁,分为 Input(消费者)和 Output(生产者)。Binding 将业务逻辑与消息基础设施解耦。
Message生产者和消费者之间通信的规范数据结构,包含消息头(Headers)和消息体(Payload)。

版本说明

Spring Cloud Stream 从 3.0 版本开始,全面采用函数式编程模型。在 4.0 版本中,基于注解的编程模型(如 @StreamListener)已被完全移除,只保留函数式编程模型。

Spring Cloud Stream 4.0 主要变化

  • 移除注解式编程模型,只支持函数式编程模型
  • 新增 Reactive Kafka Binder(基于 Reactor Kafka)
  • 重新引入 Schema Registry 支持
  • 新增 Test Binder 用于测试
  • 改进错误队列命名机制

函数式编程模型

Spring Cloud Stream 4.x 采用函数式编程模型,通过 java.util.function 包中的 SupplierFunctionConsumer 接口来定义消息处理逻辑。这种编程模型更简洁,也更容易测试。

Consumer:消息消费者

Consumer<T> 用于接收和处理消息,不返回任何结果。

@SpringBootApplication
public class OrderConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(OrderConsumerApplication.class, args);
}

/**
* 消费者:处理订单消息
*
* 方法名 handleOrder 会自动绑定到 handleOrder-in-0
* -in-0 表示这是输入绑定(消费者)
*/
@Bean
public Consumer<Order> handleOrder() {
return order -> {
log.info("收到订单: {}", order);
// 业务处理逻辑
processOrder(order);
};
}
}

Supplier:消息生产者

Supplier<T> 用于生成消息。与 Consumer 和 Function 不同,Supplier 是数据的源头,它不订阅任何输入目标,而是通过轮询机制触发。

@SpringBootApplication
public class OrderProducerApplication {

public static void main(String[] args) {
SpringApplication.run(OrderProducerApplication.class, args);
}

/**
* 生产者:定时发送订单消息
*
* 默认情况下,Supplier 每秒被调用一次
* 可以通过配置调整轮询间隔
*/
@Bean
public Supplier<Order> generateOrder() {
return () -> {
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setProduct("商品-" + RandomStringUtils.randomAlphabetic(5));
order.setCreateTime(LocalDateTime.now());
return order;
};
}
}

轮询配置

spring:
integration:
poller:
fixed-delay: 2000 # 轮询间隔(毫秒),默认 1000
max-messages-per-poll: 1 # 每次轮询最大消息数,默认 1
# cron: "0 * * * * *" # 也可以使用 cron 表达式

Function:消息处理器

Function<T, R> 接收消息,处理后返回新消息。它同时具有消费者和生产者的角色。

@SpringBootApplication
public class OrderProcessorApplication {

public static void main(String[] args) {
SpringApplication.run(OrderProcessorApplication.class, args);
}

/**
* 处理器:接收订单,处理后发送到另一个目的地
*
* 输入绑定:processOrder-in-0
* 输出绑定:processOrder-out-0
*/
@Bean
public Function<Order, ProcessedOrder> processOrder() {
return order -> {
ProcessedOrder processed = new ProcessedOrder();
processed.setOrderId(order.getId());
processed.setStatus("PROCESSED");
processed.setProcessTime(LocalDateTime.now());
processed.setTotalAmount(calculateTotal(order));
return processed;
};
}
}

绑定命名规则

Spring Cloud Stream 使用约定的命名规则自动创建绑定名称:

函数类型绑定名称模式示例
Consumer{functionName}-in-{index}handleOrder-in-0
Supplier{functionName}-out-{index}generateOrder-out-0
Function{functionName}-in-{index}{functionName}-out-{index}processOrder-in-0processOrder-out-0

索引从 0 开始,用于支持多输入多输出的函数。

配置绑定

spring:
cloud:
stream:
# 定义要激活的函数
function:
definition: handleOrder;generateOrder

# 绑定配置
bindings:
# 消费者配置
handleOrder-in-0:
destination: orders # 目的地(队列/主题名)
group: order-consumers # 消费者组
consumer:
max-attempts: 3 # 最大重试次数

# 生产者配置
generateOrder-out-0:
destination: orders
producer:
partition-key-expression: headers['orderId']

StreamBridge:灵活的消息发送

StreamBridge 提供了一种灵活的方式来发送消息,特别适合以下场景:

  • 从非 Stream 源(如 REST API)发送消息
  • 动态决定发送目标
  • 在任意位置发送消息

基本用法

@Service
public class OrderService {

@Autowired
private StreamBridge streamBridge;

public String createOrder(CreateOrderRequest request) {
// 创建订单
Order order = new Order();
order.setId(UUID.randomUUID().toString());
// ... 设置其他属性

// 发送订单消息到指定的 binding
streamBridge.send("orderCreated-out-0", order);

return order.getId();
}
}

动态目的地

StreamBridge 支持动态目的地,即在运行时决定消息发送到哪个目标:

@RestController
public class MessageController {

@Autowired
private StreamBridge streamBridge;

@PostMapping("/send")
public void sendMessage(@RequestParam String destination, @RequestBody String message) {
// 动态发送到指定目的地
// 如果目的地不存在,会自动创建
streamBridge.send(destination, message);
}
}

预创建输出绑定

默认情况下,StreamBridge 会在首次使用时创建绑定。如果希望在启动时就创建绑定,可以使用 spring.cloud.stream.output-bindings 属性:

spring:
cloud:
stream:
output-bindings: orderCreated;orderUpdated;orderDeleted

异步发送

StreamBridge 默认使用同步发送(阻塞当前线程)。如果需要异步发送:

@Service
public class AsyncMessageService {

@Autowired
private StreamBridge streamBridge;

@PostConstruct
public void init() {
// 启用异步发送模式
streamBridge.setAsync(true);
}

public void sendAsync(Order order) {
// 异步发送,不阻塞当前线程
streamBridge.send("orders-out-0", order);
}
}
异步发送注意事项

使用异步发送时,如果需要保证追踪上下文的正确传播,需要添加 Micrometer 的 context-propagation 依赖。

多 Binder 支持

当应用程序配置了多个 Binder 时,需要指定使用哪个 Binder:

spring:
cloud:
stream:
binders:
rabbit:
type: rabbit
environment:
spring.rabbitmq.host: localhost
kafka:
type: kafka
environment:
spring.kafka.bootstrap-servers: localhost:9092
// 指定使用 RabbitMQ Binder
streamBridge.send("orders-out-0", "rabbit", order);

// 指定使用 Kafka Binder
streamBridge.send("orders-out-0", "kafka", order);

Reactive 响应式编程支持

Spring Cloud Stream 基于 Spring Cloud Function 和 Project Reactor 构建,天然支持响应式编程。使用响应式 API 可以更优雅地处理流式数据。

响应式 Consumer

@Bean
public Consumer<Flux<Order>> handleOrders() {
return flux -> flux
.filter(order -> order.getAmount() > 100) // 过滤
.map(this::processOrder) // 转换
.doOnNext(order -> log.info("处理订单: {}", order))
.subscribe(); // 必须订阅
}

响应式 Function

响应式 Function 是更推荐的方式,因为它返回一个可以被框架订阅的 Mono:

@Bean
public Function<Flux<Order>, Mono<Void>> processOrders() {
return flux -> flux
.filter(order -> order.getAmount() > 100)
.map(this::enrichOrder)
.flatMap(this::saveOrder)
.then(); // 返回 Mono<Void>
}

响应式 Supplier

响应式 Supplier 有两种模式:

持续流模式(只触发一次):

@Bean
public Supplier<Flux<Order>> continuousOrderStream() {
return () -> Flux.interval(Duration.ofSeconds(1))
.map(i -> generateOrder(i));
// 这个 Supplier 只会被调用一次,持续产生消息
}

轮询模式(定期触发):

@PollableBean  // 标记为可轮询
public Supplier<Flux<Order>> batchOrderSupplier() {
return () -> {
// 返回有限流,每次轮询产生一批订单
return Flux.fromIterable(fetchPendingOrders());
};
}

响应式编程注意事项

使用响应式 API 时需要注意以下几点:

  1. 背压支持:只有使用 Reactive Kafka Binder 时才能完全利用响应式的背压特性。使用普通 Kafka 或 RabbitMQ Binder 时,响应式 API 的便利性可以使用,但无法享受完整的响应式特性。

  2. 错误处理:框架提供的错误处理、重试等机制只对命令式函数生效。响应式函数需要在流内部自行处理错误。

  3. Consumer 订阅:响应式 Consumer<Flux<?>> 需要手动订阅,否则消息不会被处理。推荐使用 Function<Flux<?>, Mono<Void>> 替代。

消费者组与分区

消费者组

消费者组是 Spring Cloud Stream 中实现消息负载均衡和高可用的重要机制。同一组内的消费者共享订阅,每条消息只会被组内的一个消费者处理。

                    ┌─────────────┐
│ 消息队列 │
└──────┬──────┘

┌────────────┼────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│消费者1 │ │消费者2 │ │消费者3 │
│(组A) │ │(组A) │ │(组B) │
└─────────┘ └─────────┘ └─────────┘

消费者组A:每条消息只被组内一个消费者处理(负载均衡)
消费者组B:独立消费所有消息(广播模式)

配置消费者组

spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
group: order-consumers # 指定消费者组

消费者组的意义

  • 负载均衡:消息在组内消费者之间分配,提高处理吞吐量
  • 高可用:一个消费者宕机,其他消费者自动接管其分区
  • 消息顺序:同一分区的消息按顺序被组内一个消费者处理

消息分区

分区确保具有相同特征的消息被发送到同一个分区,从而保证消息的顺序性。

订单消息按用户ID分区:

分区0: [用户A订单1, 用户A订单2, 用户D订单1]
分区1: [用户B订单1, 用户B订单2, 用户E订单1]
分区2: [用户C订单1, 用户C订单2, 用户F订单1]

同一用户的订单始终在同一个分区,保证顺序性

生产者分区配置

spring:
cloud:
stream:
bindings:
orderCreated-out-0:
destination: orders
producer:
# 分区键表达式,决定消息发送到哪个分区
partition-key-expression: headers['userId']
# 分区数量
partition-count: 3

消费者分区配置

spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
group: order-consumers
consumer:
partitioned: true # 启用分区消费
# 消费者实例配置
instance-count: 3 # 总实例数
instance-index: 0 # 当前实例索引(0、1、2)

函数组合

Spring Cloud Stream 支持函数组合,可以将多个简单的函数组合成一个复杂的处理链。这有助于将复杂逻辑分解为可测试、可复用的组件。

基本组合

使用 | 符号组合函数:

spring:
cloud:
function:
definition: validate|process|enrich
@SpringBootApplication
public class ComposedFunctionApplication {

// 第一步:验证
@Bean
public Function<Order, Order> validate() {
return order -> {
if (order.getItems() == null || order.getItems().isEmpty()) {
throw new IllegalArgumentException("订单项不能为空");
}
return order;
};
}

// 第二步:处理
@Bean
public Function<Order, Order> process() {
return order -> {
order.setTotalAmount(calculateTotal(order));
return order;
};
}

// 第三步:丰富
@Bean
public Function<Order, EnrichedOrder> enrich() {
return order -> {
EnrichedOrder enriched = new EnrichedOrder();
BeanUtils.copyProperties(order, enriched);
enriched.setProcessTime(LocalDateTime.now());
enriched.setProcessId(UUID.randomUUID().toString());
return enriched;
};
}
}

简化绑定名称

组合后的函数名称会很长,可以使用 spring.cloud.stream.function.bindings 属性简化:

spring:
cloud:
function:
definition: validate|process|enrich
stream:
function:
bindings:
validate|process|enrich-in-0: orderInput
validate|process|enrich-out-0: orderOutput
bindings:
orderInput:
destination: raw-orders
orderOutput:
destination: processed-orders

横切关注点

函数组合特别适合处理横切关注点,如消息验证、日志记录、消息丰富等:

// 添加追踪信息
@Bean
public Function<Message<Order>, Message<Order>> addTraceInfo() {
return message -> MessageBuilder.fromMessage(message)
.setHeader("traceId", UUID.randomUUID().toString())
.setHeader("timestamp", System.currentTimeMillis())
.build();
}

// 业务处理
@Bean
public Function<Order, Order> businessProcess() {
return order -> {
// 纯粹的业务逻辑,不关心追踪等非功能性需求
return order;
};
}

多输入多输出函数

某些场景下,一个函数需要从多个输入源接收数据,或者输出到多个目标。Spring Cloud Stream 支持使用 Reactor 的 Tuple 类型来实现这种模式。

多输入函数

@Bean
public Function<Tuple2<Flux<Order>, Flux<Payment>>, Flux<CompletedOrder>> aggregate() {
return tuple -> {
Flux<Order> orders = tuple.getT1();
Flux<Payment> payments = tuple.getT2();

// 订单和支付流水合并
return orders.join(
payments,
Duration.ofSeconds(30), // 关联时间窗口
order -> Mono.just(order.getId()),
payment -> Mono.just(payment.getOrderId()),
(order, payment) -> new CompletedOrder(order, payment)
);
};
}

绑定配置

spring:
cloud:
stream:
bindings:
aggregate-in-0: # 第一个输入(Order)
destination: orders
aggregate-in-1: # 第二个输入(Payment)
destination: payments
aggregate-out-0: # 输出
destination: completed-orders

多输出函数

@Bean
public Function<Order, Tuple2<Invoice, Shipping>> splitOrder() {
return order -> Tuples.of(
createInvoice(order),
createShipping(order)
);
}

绑定配置

spring:
cloud:
stream:
bindings:
splitOrder-in-0:
destination: orders
splitOrder-out-0: # 第一个输出(Invoice)
destination: invoices
splitOrder-out-1: # 第二个输出(Shipping)
destination: shippings

错误处理

重试机制

Spring Cloud Stream 提供了内置的重试机制,当消息处理失败时可以自动重试:

spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
consumer:
max-attempts: 3 # 最大重试次数,默认 3
back-off-initial-interval: 1000 # 初始重试间隔(毫秒)
back-off-multiplier: 2.0 # 重试间隔倍数
back-off-max-interval: 10000 # 最大重试间隔(毫秒)

重试间隔计算:interval = initialInterval * multiplier^(retryCount - 1)

死信队列

当消息重试次数耗尽后,会被发送到死信队列(DLQ):

spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
group: order-consumers

rabbit:
bindings:
handleOrder-in-0:
consumer:
auto-bind-dlq: true # 自动创建死信队列
dlq-queue-name: orders.dlq # 死信队列名称
republish-to-dlq: true # 重新发布到死信队列

死信队列名称格式:{destination}.{group}.dlq

自定义错误处理

@Bean
public Consumer<Order> handleOrder() {
return order -> {
try {
processOrder(order);
} catch (BusinessException e) {
// 业务异常,记录日志,不重试
log.error("订单处理失败: {}", order.getId(), e);
throw new ImmediateRetryException("不重试");
} catch (TechnicalException e) {
// 技术异常,抛出以触发重试
throw new RuntimeException(e);
}
};
}

// 死信队列消费者
@Bean
public Consumer<Order> handleOrderDlq() {
return failedOrder -> {
// 处理失败的消息
log.error("订单最终处理失败: {}", failedOrder);
// 发送告警或人工处理
};
}

错误通道

可以订阅错误通道来统一处理错误:

@Service
public class ErrorHandler {

@ServiceActivator(inputChannel = "orders.errors")
public void handleError(ErrorMessage errorMessage) {
log.error("消息处理错误: ", errorMessage.getPayload());
// 可以访问原始消息和异常信息
Message<?> failedMessage = errorMessage.getOriginalMessage();
Throwable cause = errorMessage.getPayload();
}
}

消息头与内容类型

消息头操作

Spring Cloud Stream 使用 Spring Messaging 的 Message 类型来访问和操作消息头:

@Bean
public Function<Message<Order>, Message<Order>> processWithHeaders() {
return message -> {
// 读取消息头
String traceId = (String) message.getHeaders().get("traceId");
String contentType = message.getHeaders().getContentType().toString();

Order order = message.getPayload();

// 创建新消息,添加或修改消息头
return MessageBuilder
.withPayload(order)
.setHeader("processedAt", System.currentTimeMillis())
.setHeader("processor", "order-service")
.build();
};
}

内容类型协商

Spring Cloud Stream 支持自动的内容类型转换。默认支持 JSON、文本、二进制等格式:

spring:
cloud:
stream:
bindings:
handleOrder-in-0:
destination: orders
content-type: application/json # 指定内容类型

常用内容类型

  • application/json:JSON 格式(默认)
  • application/xml:XML 格式
  • text/plain:纯文本
  • application/octet-stream:二进制流

自定义序列化

@Configuration
public class SerializationConfig {

@Bean
public MessageConverter avroMessageConverter() {
return new MessageConverter() {
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
// 自定义序列化逻辑
byte[] data = serializeToAvro(payload);
return MessageBuilder.withPayload(data)
.setContentType(MimeTypeUtils.parseMimeType("application/avro"))
.build();
}

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
// 自定义反序列化逻辑
return deserializeFromAvro((byte[]) message.getPayload(), targetClass);
}
};
}
}

Schema Registry 支持

Spring Cloud Stream 4.0 重新引入了 Schema Registry 支持,特别适合需要模式演进的场景,如使用 Avro 进行序列化。

Schema Registry Server

添加依赖:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema-registry-server</artifactId>
</dependency>

启动类:

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

默认使用 H2 内存数据库,可以切换到 PostgreSQL 或 MySQL:

spring:
datasource:
url: jdbc:postgresql://localhost:5432/schema_registry
username: postgres
password: postgres
jpa:
hibernate:
ddl-auto: update

Schema Registry Client

添加依赖:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema-registry-client</artifactId>
</dependency>

配置:

spring:
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8990

启用 Schema Registry Client:

@SpringBootApplication
@EnableSchemaRegistryClient
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}

测试支持

Spring Cloud Stream 4.0 提供了 Test Binder,方便进行单元测试和集成测试。

测试依赖

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>

单元测试示例

@SpringBootTest
public class OrderProcessorTest {

@Autowired
private InputDestination input;

@Autowired
private OutputDestination output;

@Test
void testProcessOrder() {
// 准备测试数据
Order order = new Order();
order.setId("order-001");
order.setAmount(new BigDecimal("100.00"));

// 发送消息到输入绑定
input.send(new GenericMessage<>(order), "processOrder-in-0");

// 从输出绑定接收消息
Message<byte[]> received = output.receive(1000, "processOrder-out-0");

// 验证结果
assertThat(received).isNotNull();
ProcessedOrder processed = new ObjectMapper()
.readValue(received.getPayload(), ProcessedOrder.class);
assertThat(processed.getOrderId()).isEqualTo("order-001");
}
}

使用 TestBinder

@SpringBootTest(properties = {
"spring.cloud.function.definition=handleOrder"
})
public class ConsumerTest {

@Autowired
private InputDestination input;

@Test
void testConsumer() {
Order order = new Order("order-001", "product-001", 10);

// 发送消息
input.send(MessageBuilder.withPayload(order).build());

// 由于 Consumer 没有输出,可以通过 mock 或验证副作用来测试
// 例如验证数据库记录、日志输出等
}
}

RabbitMQ Binder 详解

RabbitMQ 特有配置

spring:
cloud:
stream:
rabbit:
bindings:
handleOrder-in-0:
consumer:
# 预取数量,控制消费速率
prefetch: 10
# 确认模式:AUTO、MANUAL、NONE
acknowledge-mode: auto
# 是否将拒绝的消息重新入队
requeue-rejected: false
# 死信队列配置
auto-bind-dlq: true
dlq-dead-letter-exchange: orders.dlx
dlq-dead-letter-routing-key: orders.failed
# 消费者标签
consumer-tag-prefix: order-consumer

orderCreated-out-0:
producer:
# 消息持久化
delivery-mode: persistent
# 消息过期时间(毫秒)
ttl: 60000
# 优先级
priority: 10

交换机和队列配置

spring:
cloud:
stream:
rabbit:
bindings:
handleOrder-in-0:
consumer:
# 队列配置
queue-name-group-only: false
binding-routing-key: "#"
# 交换机配置
exchange-type: topic
delayed-exchange: false

Kafka Binder 详解

Kafka 特有配置

spring:
cloud:
stream:
kafka:
bindings:
handleOrder-in-0:
consumer:
# 自动提交偏移量
ack-mode: manual
# 从最早的偏移量开始
start-offset: earliest
# 是否自动重置偏移量
reset-offsets: false
# 批量消费模式
batch-mode: false

orderCreated-out-0:
producer:
# 分区数量
partition-count: 3
# 副本数量
replication-factor: 2
# 消息键表达式
message-key-expression: headers['orderId']
# 压缩类型
compression-type: gzip

Kafka 事务支持

spring:
cloud:
stream:
kafka:
binder:
# 事务配置
transaction:
transaction-id-prefix: tx-order-
# 生产者配置
producer-properties:
enable.idempotence: true
acks: all

Reactive Kafka Binder

Spring Cloud Stream 4.0 提供了专门的 Reactive Kafka Binder,支持完整的响应式编程模型:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>
spring:
cloud:
stream:
kafka:
binder:
type: reactive
configuration:
bootstrap.servers: localhost:9092
重要:Reactor Kafka 项目已停止维护

2025年5月,Spring 团队宣布 Reactor Kafka 项目将停止维护。这意味着:

  • Reactor Kafka 将从未来的 Reactor BOM 中移除
  • Reactor Kafka 1.3 将是最后一个次要版本
  • Spring Cloud Stream Reactive Kafka Binder 已被弃用,将在未来版本中移除

迁移建议

对于新项目,建议使用标准的 Kafka Binder 配合响应式函数签名:

// 推荐方式:使用标准 Kafka Binder + 响应式函数
@Bean
public Function<Flux<Order>, Mono<Void>> processOrders() {
return flux -> flux
.map(this::processOrder)
.then();
}

对于现有使用 Reactive Kafka Binder 的项目,建议规划迁移到标准 Kafka Binder。虽然响应式 API 的便利性可以使用,但标准 Kafka Binder 无法提供完整的端到端响应式特性(如背压)。

详见 Spring 官方公告

实战示例:订单处理系统

场景描述

模拟一个完整的订单处理流程:

  1. 订单服务发送订单消息
  2. 库存服务消费订单消息,扣减库存
  3. 支付服务消费订单消息,处理支付
  4. 通知服务消费处理结果,发送通知

公共消息模型

@Data
public class OrderMessage {
private String orderId;
private String userId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private OrderStatus status;
private LocalDateTime createTime;
}

@Data
public class OrderItem {
private String productId;
private String productName;
private Integer quantity;
private BigDecimal price;
}

public enum OrderStatus {
CREATED, // 已创建
PAID, // 已支付
INVENTORY_OK, // 库存检查通过
COMPLETED, // 已完成
CANCELLED // 已取消
}

订单服务(生产者)

@Service
public class OrderService {

@Autowired
private StreamBridge streamBridge;

public String createOrder(CreateOrderRequest request) {
// 创建订单
OrderMessage order = new OrderMessage();
order.setOrderId(UUID.randomUUID().toString());
order.setUserId(request.getUserId());
order.setItems(request.getItems());
order.setTotalAmount(calculateTotal(request.getItems()));
order.setStatus(OrderStatus.CREATED);
order.setCreateTime(LocalDateTime.now());

// 发送订单消息
streamBridge.send("orderCreated-out-0",
MessageBuilder.withPayload(order)
.setHeader("orderId", order.getOrderId())
.build());

return order.getOrderId();
}
}

库存服务(消费者)

@Service
@Slf4j
public class InventoryService {

@Autowired
private StreamBridge streamBridge;

@Bean
public Consumer<OrderMessage> handleOrderCreated() {
return order -> {
log.info("处理订单库存: {}", order.getOrderId());

try {
// 检查并扣减库存
for (OrderItem item : order.getItems()) {
boolean success = deductStock(item.getProductId(), item.getQuantity());
if (!success) {
throw new InsufficientStockException(item.getProductId());
}
}

// 发送库存扣减成功消息
OrderMessage updated = new OrderMessage();
BeanUtils.copyProperties(order, updated);
updated.setStatus(OrderStatus.INVENTORY_OK);

streamBridge.send("inventoryOk-out-0", updated);

} catch (Exception e) {
log.error("库存处理失败: {}", order.getOrderId(), e);
// 抛出异常触发重试或进入死信队列
throw new RuntimeException(e);
}
};
}

private boolean deductStock(String productId, int quantity) {
// 实际库存扣减逻辑
return true;
}
}

支付服务(消费者)

@Service
@Slf4j
public class PaymentService {

@Autowired
private StreamBridge streamBridge;

@Bean
public Consumer<OrderMessage> handleOrderCreated() {
return order -> {
log.info("处理订单支付: {}", order.getOrderId());

try {
// 调用支付接口
PaymentResult result = processPayment(order);

if (result.isSuccess()) {
OrderMessage paid = new OrderMessage();
BeanUtils.copyProperties(order, paid);
paid.setStatus(OrderStatus.PAID);

streamBridge.send("paymentOk-out-0", paid);
} else {
// 支付失败,发送失败消息
handlePaymentFailure(order, result);
}
} catch (Exception e) {
log.error("支付处理失败: {}", order.getOrderId(), e);
throw new RuntimeException(e);
}
};
}

private PaymentResult processPayment(OrderMessage order) {
// 实际支付逻辑
return new PaymentResult(true, "success");
}
}

通知服务(消费者)

@Service
@Slf4j
public class NotificationService {

@Bean
public Consumer<OrderMessage> handleOrderCompleted() {
return order -> {
log.info("发送订单通知: {}", order.getOrderId());

// 发送通知
String message = buildNotificationMessage(order);
sendNotification(order.getUserId(), message);
};
}

private String buildNotificationMessage(OrderMessage order) {
return String.format("您的订单 %s 已处理完成,状态:%s",
order.getOrderId(), order.getStatus());
}

private void sendNotification(String userId, String message) {
// 实际通知发送逻辑(邮件、短信、推送等)
}
}

最佳实践

1. 消息幂等性

消息可能被重复投递,消费者需要保证幂等性:

@Bean
public Consumer<OrderMessage> handleOrder() {
return order -> {
// 使用业务唯一键检查是否已处理
if (isProcessed(order.getOrderId())) {
log.info("订单已处理,跳过: {}", order.getOrderId());
return;
}

// 处理订单
processOrder(order);

// 标记为已处理
markAsProcessed(order.getOrderId());
};
}

2. 消息顺序性

使用分区保证消息的顺序性:

spring:
cloud:
stream:
bindings:
orderCreated-out-0:
producer:
# 使用订单ID作为分区键,同一订单的消息始终在同一分区
partition-key-expression: headers['orderId']
partition-count: 3

3. 合理配置重试

根据业务场景配置重试策略:

  • 瞬时故障:配置多次重试,指数退避
  • 业务异常:不重试,直接进入死信队列
  • 资源限制:限制最大重试次数,避免无限重试

4. 监控与追踪

spring:
cloud:
stream:
bindings:
handleOrder-in-0:
consumer:
# 启用度量指标
enable-metrics: true

使用 Spring Cloud Sleuth 或 Micrometer Tracing 进行分布式追踪。

5. 合理拆分主题

  • 按业务领域拆分主题
  • 避免单个主题消息量过大
  • 考虑消息的订阅关系

小结

本章我们学习了:

  1. 消息驱动架构:异步通信的优势和应用场景
  2. 核心概念:Binder、Binding、Message
  3. 函数式编程模型:Consumer、Supplier、Function
  4. StreamBridge:灵活的消息发送机制
  5. 响应式编程:Flux 和 Mono 的使用
  6. 消费者组与分区:实现负载均衡和消息顺序
  7. 函数组合:组合简单函数实现复杂逻辑
  8. 错误处理:重试机制和死信队列
  9. 测试支持:Test Binder 的使用
  10. Binder 配置:RabbitMQ 和 Kafka 的特定配置

版本说明

本教程基于 Spring Cloud Stream 4.x 编写,主要变化如下:

版本变化
4.0完全移除注解式编程模型(@StreamListener),只支持函数式编程模型;新增 Reactive Kafka Binder;重新引入 Schema Registry 支持;新增 Test Binder
4.1+持续优化和 Bug 修复

重要变更提醒

  • Reactive Kafka Binder 已弃用:由于 Reactor Kafka 项目停止维护,Spring Cloud Stream Reactive Kafka Binder 已被标记为弃用,将在未来版本中移除。新项目建议使用标准 Kafka Binder 配合响应式函数签名。
  • 注解模型已移除:从 4.0 开始,@StreamListener@Input@Output 等注解不再可用,必须使用函数式编程模型

参考资源