分布式消息队列
消息队列是分布式系统中实现异步通信和解耦的核心组件。它允许应用程序通过发送消息的方式进行通信,而不需要直接调用彼此。这种松耦合的设计使得系统更加灵活、可扩展,同时能够应对流量高峰和系统故障。
为什么需要消息队列
同步通信的问题
在传统的同步调用模式中,服务 A 调用服务 B 时,A 必须等待 B 返回响应。这种紧密耦合存在几个明显的问题:
可用性耦合:如果服务 B 不可用,服务 A 的请求就会失败。A 必须处理 B 的故障情况,增加了复杂性。
性能瓶颈:A 的响应时间取决于 B 的处理时间。如果 B 处理缓慢,A 的吞吐量就会下降。在高并发场景下,慢服务会成为整个系统的瓶颈。
流量冲击:当请求量突然激增时,下游服务可能无法承受。没有缓冲机制的情况下,请求会直接冲击下游,导致服务过载甚至崩溃。
扩展困难:要提升整体处理能力,A 和 B 必须同步扩容。无法独立地扩展某个服务,增加了运维复杂度。
消息队列的解耦作用
消息队列通过引入一个中间层,将生产者和消费者解耦。生产者只需要将消息发送到队列,不需要关心谁来消费、什么时候消费。消费者从队列中获取消息进行处理,不需要知道消息来自哪里。
这种解耦带来了几个重要的好处:
时间解耦:生产者发送消息后不需要等待,可以立即返回。消费者可以按照自己的节奏处理消息,生产者和消费者的处理时间不再相互影响。
空间解耦:生产者不需要知道消费者的地址和数量。新增消费者不需要修改生产者的代码,系统更容易扩展。
流量削峰:当请求量激增时,消息队列可以暂存消息,下游服务按照自己的处理能力消费,避免被压垮。这是一种"以空间换时间"的策略。
可靠传输:消息队列通常会持久化消息,即使消费者暂时不可用,消息也不会丢失。消费者恢复后可以继续处理。
消息队列的核心概念
基本模型
消息队列的基本模型涉及三个角色:生产者、队列、消费者。
生产者(Producer):负责创建和发送消息的应用。生产者将消息发送到队列,然后继续自己的工作,不需要等待响应。
队列(Queue):消息的存储容器,遵循先进先出(FIFO)原则。队列通常持久化存储消息,确保消息不会因为系统故障而丢失。
消费者(Consumer):从队列中获取消息并处理的应用。消费者可以是一个或多个,多个消费者可以并行处理消息,提高吞吐量。
生产者 队列 消费者
│ │ │
│──── 发送消息 ────────>│ │
│ │──── 投递消息 ─────────>│
│ │ │── 处理消息
│ │<──── 确认(ACK)────────│
│ │ │
消息模型
消息队列主要有两种消息模型:点对点(Point-to-Point)和发布订阅(Pub/Sub)。
点对点模型
点对点模型中,一条消息只能被一个消费者消费。多个消费者可以监听同一个队列,但每条消息只会被投递给其中一个消费者。这是一种竞争消费的模式,适合任务分发场景。
// 点对点模型示例:任务队列
public class TaskQueue {
private final MessageQueue mq;
// 生产者:提交任务
public void submitTask(Task task) {
Message message = new Message(
"task-queue", // 队列名称
serialize(task)
);
mq.send(message);
}
// 消费者:处理任务
public void startConsumer() {
mq.consume("task-queue", (message) -> {
Task task = deserialize(message.getBody());
processTask(task);
return ConsumeResult.SUCCESS; // 返回成功,消息确认
});
}
}
点对点模型的特点:
- 每条消息只被消费一次
- 消费者之间是竞争关系
- 适合任务分发、工作队列场景
发布订阅模型
发布订阅模型中,消息被发布到主题(Topic),所有订阅了该主题的消费者都会收到消息的副本。这是一种广播模式,适合事件通知、数据分发场景。
// 发布订阅模型示例:订单事件
public class OrderEventPublisher {
private final MessageQueue mq;
// 发布者:发布订单创建事件
public void publishOrderCreated(Order order) {
Message message = new Message(
"order-created-topic", // 主题名称
serialize(order)
);
mq.publish(message);
}
}
// 订阅者1:库存服务
public class InventorySubscriber {
public void subscribe() {
mq.subscribe("order-created-topic", "inventory-group", (message) -> {
Order order = deserialize(message.getBody());
reserveInventory(order);
return ConsumeResult.SUCCESS;
});
}
}
// 订阅者2:通知服务
public class NotificationSubscriber {
public void subscribe() {
mq.subscribe("order-created-topic", "notification-group", (message) -> {
Order order = deserialize(message.getBody());
sendNotification(order);
return ConsumeResult.SUCCESS;
});
}
}
发布订阅模型的特点:
- 一条消息可被多个消费者消费
- 消费者之间互不影响
- 适合事件驱动架构、数据复制场景
消息确认机制
消息确认(Acknowledgment)是保证消息可靠传递的关键机制。消费者处理完消息后,需要向队列发送确认。如果消费者在处理过程中崩溃,队列没有收到确认,就会重新投递消息。
消息确认流程:
消费者 队列
│ │
│<──── 投递消息 ─────────│
│ │
│── 处理消息 │
│ │
│──── 发送 ACK ─────────>│ 消息从队列删除
│ │
处理失败场景:
消费者 队列
│ │
│<──── 投递消息 ─────────│
│ │
│── 处理消息(崩溃) │
│ │
│ (超时) │
│<──── 重新投递 ─────────│ 消息重新变为可消费
确认机制有三种模式:
自动确认:消息投递后立即确认,不等待消费者处理完成。性能最高,但可能丢失消息。
手动确认:消费者处理完成后手动发送确认。最安全,但需要开发者正确处理。
批量确认:消费者可以批量确认多条消息,减少网络开销。
// 手动确认示例
public void consumeWithManualAck() {
mq.consume("queue", (message) -> {
try {
process(message);
message.ack(); // 手动确认
} catch (Exception e) {
message.nack(); // 确认失败,消息重新入队
}
});
}
消息持久化
消息持久化确保消息在队列重启后仍然存在。持久化通常涉及三个层面:
队列持久化:队列本身的元数据被持久化,重启后队列仍然存在。
消息持久化:消息内容被写入磁盘,不会因为队列重启而丢失。
交换器持久化:在发布订阅模型中,交换器的元数据也需要持久化。
// 持久化配置示例
public void setupDurableQueue() {
// 创建持久化队列
Queue queue = new Queue("task-queue", true); // true 表示持久化
// 发送持久化消息
Message message = new Message("task-queue", data);
message.setDeliveryMode(DeliveryMode.PERSISTENT); // 消息持久化
mq.send(message);
}
持久化与性能的权衡:
- 持久化会降低消息吞吐量,因为需要写入磁盘
- 可以通过批量写入、异步刷盘等优化性能
- 对可靠性要求高的场景必须开启持久化
消息队列的高级特性
消息顺序性
在某些场景下,消息的顺序至关重要。比如订单状态变更(创建→支付→发货),如果顺序错乱,业务逻辑就会出错。
全局顺序:队列中所有消息严格按先进先出顺序消费。实现简单,但并发度受限。
分区顺序:将消息按某个键(如订单ID)分区,同一分区的消息严格有序。平衡了顺序性和并发度。
// 分区顺序消息示例
public void sendOrderedMessage(OrderEvent event) {
Message message = new Message("order-topic", serialize(event));
// 使用订单ID作为分区键,保证同一订单的消息有序
message.setPartitionKey(event.getOrderId());
mq.send(message);
}
// 消费者侧:同一分区的消息串行处理
public void consumeOrdered() {
mq.subscribe("order-topic", (message) -> {
// 同一分区的消息按顺序到达
processEvent(deserialize(message));
});
}
消息去重
网络抖动、消费者重试等原因可能导致消息重复。消息队列通常提供"至少一次"投递语义,消费者需要自行处理重复消息。
幂等性设计:消费者处理逻辑天生支持重复执行。如 UPDATE account SET balance = 100 WHERE id = 1,执行多次结果相同。
去重表:记录已处理消息的ID,消费前检查是否已处理过。
唯一键约束:数据库层面的唯一键约束可以防止重复插入。
// 去重表方案
public void consumeWithDeduplication(Message message) {
String messageId = message.getId();
// 检查是否已处理
if (deduplicationTable.exists(messageId)) {
message.ack(); // 已处理,直接确认
return;
}
try {
// 处理消息
process(message);
// 记录已处理(需要原子性)
deduplicationTable.insert(messageId);
message.ack();
} catch (Exception e) {
message.nack();
}
}
延迟消息
延迟消息允许消息在指定时间后才可被消费。常见应用场景包括:
- 订单超时取消:订单创建30分钟后检查是否支付
- 定时提醒:用户设置的未来提醒
- 重试退避:失败任务延迟重试
// 延迟消息示例
public void scheduleOrderTimeoutCheck(Order order) {
Message message = new Message("order-timeout-topic", serialize(order));
// 设置延迟时间:30分钟
message.setDelayTime(30 * 60 * 1000);
mq.send(message);
}
// 消费者:30分钟后执行
public void checkOrderTimeout() {
mq.consume("order-timeout-topic", (message) -> {
Order order = deserialize(message.getBody());
if (!order.isPaid()) {
cancelOrder(order);
}
return ConsumeResult.SUCCESS;
});
}
延迟消息的实现方式:
- 时间轮:将延迟任务放入时间轮,定时扫描到期任务
- 延迟队列:消息按到期时间排序,消费者只能消费到期的消息
- 定时任务:使用定时任务框架扫描到期的消息
死信队列
当消息消费失败超过一定次数,或者消息过期,这些"无法处理"的消息会被转发到死信队列(Dead Letter Queue)。死信队列允许后续人工处理或分析。
死信产生的场景:
- 消费者拒绝消息且不重新入队
- 消息消费失败超过最大重试次数
- 消息过期(TTL)
// 死信队列配置
public void setupDeadLetterQueue() {
// 主队列配置死信队列
Queue mainQueue = QueueBuilder
.durable("main-queue")
.deadLetterExchange("dlx-exchange") // 死信交换器
.deadLetterRoutingKey("dlq") // 死信路由键
.build();
// 死信队列
Queue deadLetterQueue = new Queue("dead-letter-queue", true);
// 绑定死信队列到死信交换器
Binding binding = BindingBuilder
.bind(deadLetterQueue)
.to(new DirectExchange("dlx-exchange"))
.with("dlq");
}
// 处理死信
public void processDeadLetters() {
mq.consume("dead-letter-queue", (message) -> {
log.error("死信消息: {}, 原因: {}",
message.getId(),
message.getHeader("x-death-reason"));
// 人工处理或告警
alertDeadLetter(message);
return ConsumeResult.SUCCESS;
});
}
消息事务
消息事务确保消息发送和本地数据库操作的一致性。常见场景:用户支付成功后,数据库更新余额,同时发送消息通知库存服务。这两者必须同时成功或同时失败。
本地事务方案:在本地事务中写入消息表,定时任务扫描消息表并发送消息。
// 本地事务方案
@Transactional
public void payAndNotify(PaymentRequest request) {
// 1. 更新账户余额
accountMapper.updateBalance(request.getUserId(), -request.getAmount());
// 2. 写入消息表(与业务数据在同一事务)
OutboxMessage message = new OutboxMessage();
message.setId(UUID.randomUUID().toString());
message.setTopic("payment-topic");
message.setPayload(serialize(request));
message.setStatus(MessageStatus.PENDING);
outboxMapper.insert(message);
// 事务提交后,消息表和余额更新一起持久化
}
// 定时任务:扫描消息表并发送
@Scheduled(fixedRate = 1000)
public void sendPendingMessages() {
List<OutboxMessage> messages = outboxMapper.findByStatus(MessageStatus.PENDING);
for (OutboxMessage message : messages) {
try {
mq.send(message.getTopic(), message.getPayload());
// 发送成功,更新状态
outboxMapper.updateStatus(message.getId(), MessageStatus.SENT);
} catch (Exception e) {
log.error("发送消息失败: {}", message.getId(), e);
}
}
}
事务消息方案:RocketMQ 提供的事务消息机制,通过半消息和事务状态回查实现一致性。
// RocketMQ 事务消息
public void sendTransactionMessage(PaymentRequest request) {
TransactionSendResult result = mq.sendMessageInTransaction(
"payment-topic",
serialize(request),
null // 本地事务参数
);
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("发送事务消息失败");
}
}
// 本地事务执行器
public class PaymentTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
PaymentRequest request = deserialize(msg.getBody());
accountMapper.updateBalance(request.getUserId(), -request.getAmount());
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务状态回查:检查本地事务是否执行成功
PaymentRequest request = deserialize(msg.getBody());
boolean exists = accountMapper.checkTransactionExists(request.getTransactionId());
return exists ? LocalTransactionState.COMMIT_MESSAGE
: LocalTransactionState.ROLLBACK_MESSAGE;
}
}
主流消息队列对比
市场上有多款成熟的消息队列产品,各有特点。选择时需要根据业务场景权衡。
RabbitMQ
RabbitMQ 是基于 AMQP 协议的消息队列,由 Erlang 开发,以可靠性著称。
核心概念:
- Exchange(交换器):接收生产者发送的消息,根据路由规则分发到队列
- Queue(队列):存储消息,等待消费者消费
- Binding(绑定):队列与交换器之间的路由关系
- Routing Key(路由键):交换器根据路由键将消息路由到队列
交换器类型:
- Direct:精确匹配路由键
- Topic:支持通配符匹配(
*匹配一个词,#匹配多个词) - Fanout:广播到所有绑定队列
- Headers:根据消息头匹配
// RabbitMQ 发送消息示例
public void sendMessage() {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换器
channel.exchangeDeclare("order-exchange", "topic", true);
// 声明队列
channel.queueDeclare("order-queue", true, false, false, null);
// 绑定队列到交换器
channel.queueBind("order-queue", "order-exchange", "order.*");
// 发送消息
String message = "Order created";
channel.basicPublish(
"order-exchange", // 交换器
"order.created", // 路由键
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes()
);
}
}
RabbitMQ 的优势:
- 消息可靠性高,支持消息确认、持久化、事务
- 路由灵活,支持多种交换器类型
- 管理界面完善,便于监控和运维
- 社区成熟,文档丰富
RabbitMQ 的局限:
- 吞吐量相对较低(相比 Kafka)
- 集群扩展能力有限
- Erlang 语言开发,二次开发困难
Apache Kafka
Kafka 是为大数据场景设计的分布式消息系统,以高吞吐量著称。核心概念与传统消息队列有所不同。
核心概念:
- Producer(生产者):发送消息的应用
- Consumer(消费者):消费消息的应用
- Broker:Kafka 服务节点
- Topic(主题):消息的逻辑分类
- Partition(分区):Topic 的物理分片,分布在多个 Broker
- Consumer Group(消费者组):消费者组成组,组内消费者分摊消费分区
分区的意义:
- 数据分散存储,突破单机限制
- 并行消费,提高吞吐量
- 分区内有序,保证顺序性
消费者组的工作方式:
- 一个分区只能被组内一个消费者消费
- 一个消费者可以消费多个分区
- 消费者数量不应超过分区数(否则有消费者空闲)
// Kafka 生产者示例
public class KafkaProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("acks", "all"); // 确认模式:所有副本确认
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-topic", // Topic
"order-123", // Key(决定分区)
"Order content" // Value
);
// 异步发送,带回调
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("发送失败", exception);
} else {
log.info("发送成功: partition={}, offset={}",
metadata.partition(), metadata.offset());
}
});
producer.close();
}
}
// Kafka 消费者示例
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-group"); // 消费者组
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest"); // 从最早开始消费
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 Topic
consumer.subscribe(Arrays.asList("order-topic"));
// 消费循环
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record.value());
}
// 手动提交偏移量
consumer.commitSync();
}
}
}
Kafka 的优势:
- 吞吐量极高,适合大数据场景
- 消息持久化,支持消息回溯
- 水平扩展能力强,支持大规模集群
- 与大数据生态集成良好(Spark、Flink 等)
Kafka 的局限:
- 功能相对简单,不支持复杂的消息路由
- 消费者逻辑较复杂,需要处理偏移量
- 不适合实时性要求极高的场景
RocketMQ
RocketMQ 是阿里巴巴开源的消息队列,针对金融级场景设计,兼顾可靠性和性能。
核心概念:
- NameServer:路由注册中心,类似注册中心
- Broker:消息服务器,负责存储和转发消息
- Producer:消息生产者
- Consumer:消息消费者
- Topic:消息主题
- Queue:Topic 下的消息队列
RocketMQ 特有特性:
事务消息:通过半消息机制,保证本地事务和消息发送的一致性。
延迟消息:内置延迟级别,支持 1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h。
消息过滤:支持 Tag 和 SQL 表达式过滤。
// RocketMQ 发送延迟消息
public void sendDelayedMessage() {
DefaultMQProducer producer = new DefaultMQProducer("delay-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("delay-topic", "Hello".getBytes());
// 设置延迟级别:16 对应 30 分钟
message.setDelayTimeLevel(16);
producer.send(message);
producer.shutdown();
}
// RocketMQ 事务消息
public void sendTransactionMessage() {
TransactionMQProducer producer = new TransactionMQProducer("tx-group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new OrderTransactionListener());
producer.start();
Message message = new Message("order-topic", orderData.getBytes());
// 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
}
RocketMQ 的优势:
- 事务消息支持完善
- 延迟消息开箱即用
- 适合金融等高可靠场景
- 国产化支持好,中文社区活跃
RocketMQ 的局限:
- 社区规模小于 Kafka 和 RabbitMQ
- 与大数据生态集成较少
- 运维工具相对简单
对比总结
| 特性 | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|
| 吞吐量 | 中(万级/秒) | 高(百万级/秒) | 高(十万级/秒) |
| 延迟 | 低(微秒级) | 中(毫秒级) | 低(毫秒级) |
| 消息可靠性 | 高 | 高 | 极高 |
| 功能丰富度 | 高 | 中 | 高 |
| 事务支持 | 支持 | 不支持 | 完善(事务消息) |
| 延迟消息 | 支持(插件) | 不支持 | 原生支持 |
| 消息回溯 | 不支持 | 支持 | 支持 |
| 运维复杂度 | 中 | 高 | 中 |
| 适用场景 | 传统业务系统 | 大数据、日志 | 金融、电商 |
选择建议:
- 业务系统、需要复杂路由:选择 RabbitMQ
- 大数据处理、日志收集:选择 Kafka
- 金融交易、电商订单:选择 RocketMQ
消息队列的最佳实践
生产者最佳实践
消息幂等设计:每条消息携带唯一ID,消费者根据ID去重。
public void sendMessage(String orderId, String event) {
Message message = new Message(topic, event.getBytes());
// 设置消息ID(业务ID)
message.setKeys(orderId);
// 设置消息唯一ID(用于去重)
message.putUserProperty("messageId", UUID.randomUUID().toString());
producer.send(message);
}
发送失败处理:捕获异常,实现重试机制,但避免无限重试。
public void sendWithRetry(Message message, int maxRetries) {
int retries = 0;
while (retries < maxRetries) {
try {
producer.send(message);
return;
} catch (Exception e) {
retries++;
log.warn("发送失败,第{}次重试", retries, e);
if (retries >= maxRetries) {
// 重试失败,写入本地表,后台重发
saveToLocalTable(message);
throw new RuntimeException("消息发送失败");
}
// 指数退避
Thread.sleep((long) Math.pow(2, retries) * 1000);
}
}
}
异步发送:高吞吐场景使用异步发送,避免阻塞业务线程。
public void sendAsync(Message message) {
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
log.info("发送成功: {}", result.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("发送失败", e);
// 失败处理
}
});
}
消费者最佳实践
消费幂等:根据业务ID或消息ID实现去重。
public ConsumeResult consume(Message message) {
String messageId = message.getUserProperty("messageId");
// 使用 Redis 实现幂等
Boolean success = redis.setnx("msg:" + messageId, "1", 300);
if (!success) {
return ConsumeResult.SUCCESS; // 重复消息,直接确认
}
try {
process(message);
return ConsumeResult.SUCCESS;
} catch (Exception e) {
redis.del("msg:" + messageId); // 处理失败,删除标记
return ConsumeResult.FAILED;
}
}
批量消费:提高吞吐量,减少网络开销。
public void consumeBatch() {
List<Message> messages = mq.batchReceive("queue", 100);
for (Message message : messages) {
try {
process(message);
} catch (Exception e) {
// 记录失败消息,不影响其他消息
log.error("处理失败: {}", message.getId(), e);
}
}
// 批量确认
mq.batchAck(messages);
}
消费失败处理:合理的重试和死信机制。
public ConsumeResult consumeWithRetry(Message message) {
int retryCount = message.getRetryCount();
if (retryCount > 3) {
// 超过最大重试,发送到死信队列
sendToDeadLetterQueue(message);
return ConsumeResult.SUCCESS;
}
try {
process(message);
return ConsumeResult.SUCCESS;
} catch (Exception e) {
// 重新投递,增加重试计数
message.setRetryCount(retryCount + 1);
return ConsumeResult.FAILED_RETRY;
}
}
集群和高可用
生产者连接多节点:避免单点故障。
// RabbitMQ 多节点连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setPort(5672);
factory.setAutomaticRecoveryEnabled(true); // 自动重连
// Kafka 多节点
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
消费者负载均衡:多个消费者组成消费者组,自动分摊负载。
// Kafka 消费者组
props.put("group.id", "order-consumer-group");
// 同一 group.id 的多个消费者实例自动分摊分区
监控告警:监控队列积压、消费延迟等关键指标。
// 监控消息积压
public void monitorLag() {
long lag = consumer.getLag("order-topic", "order-group");
if (lag > THRESHOLD) {
alert("消息积压告警: lag=" + lag);
}
}
小结
本章我们深入学习了分布式消息队列的核心知识:
消息队列的作用:解耦、异步、削峰、可靠传输,是分布式系统的基础设施。
核心概念:生产者、消费者、队列、主题、消息确认、消息持久化等基本概念构成了消息队列的理论基础。
消息模型:点对点模型适合任务分发,发布订阅模型适合事件驱动架构。
高级特性:消息顺序性、消息去重、延迟消息、死信队列、消息事务等特性解决了复杂业务场景的需求。
主流产品对比:RabbitMQ 功能丰富适合传统业务,Kafka 高吞吐适合大数据,RocketMQ 高可靠适合金融场景。
最佳实践:生产者幂等设计、发送失败处理,消费者幂等消费、批量消费、失败处理,集群高可用部署和监控告警。
消息队列是分布式系统架构师必须掌握的核心技能。合理使用消息队列可以显著提升系统的可扩展性、可靠性和性能。但也要注意,消息队列引入了异步复杂性,需要仔细处理消息丢失、重复、顺序等问题。
如果你想深入学习消息队列,推荐阅读:
- 《Kafka: The Definitive Guide》—— Kafka 官方指南
- RabbitMQ 官方文档:rabbitmq.com/documentation.html
- RocketMQ 官方文档:rocketmq.apache.org/docs/quick-start/