消息类型
RocketMQ 支持多种消息类型,满足不同的业务场景需求。本章将详细介绍普通消息、顺序消息、延时消息和事务消息的使用方法。
消息类型概述
RocketMQ 5.0 开始支持消息类型强制校验,每个 Topic 只允许发送一种类型的消息:
| 消息类型 | 标识 | 说明 | 适用场景 |
|---|---|---|---|
| 普通消息 | NORMAL | 最基础的消息类型 | 异步解耦、流量削峰 |
| 顺序消息 | FIFO | 保证消息的顺序性 | 订单处理、数据同步 |
| 延时消息 | DELAY | 延迟一定时间后消费 | 定时任务、延时触发 |
| 事务消息 | TRANSACTION | 保证分布式事务一致性 | 支付、订单 |
普通消息
普通消息是最基础的消息类型,适用于大多数异步通信场景。
应用场景
- 异步解耦:将非核心业务异步化处理
- 流量削峰:应对突发流量,保护下游系统
- 事件通知:发送通知、短信等
发送普通消息
5.0 SDK 示例:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class NormalMessageProducer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("NormalTopic")
.build();
// 构建普通消息
Message message = provider.newMessageBuilder()
.setTopic("NormalTopic")
.setTag("TagA")
.setKeys("order_001")
.setBody("订单创建消息".getBytes())
.build();
// 发送消息
SendReceipt receipt = producer.send(message);
System.out.println("消息发送成功: " + receipt.getMessageId());
producer.close();
}
}
4.x SDK 示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class NormalMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("NormalProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 创建普通消息
Message msg = new Message(
"NormalTopic", // Topic
"TagA", // Tag
"order_001", // Keys
"订单创建消息".getBytes() // Body
);
// 发送消息
SendResult result = producer.send(msg);
System.out.println("发送结果: " + result);
producer.shutdown();
}
}
创建普通消息 Topic
# 创建普通消息类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t NormalTopic -a +message.type=NORMAL
顺序消息
顺序消息保证消息按照发送顺序被消费,适用于需要严格顺序处理的业务场景。
应用场景
- 订单处理:订单创建、支付、发货需要按顺序处理
- 数据同步:数据库 binlog 同步需要按顺序执行
- 交易撮合:相同价格的交易单按先后顺序处理
顺序消息原理
RocketMQ 通过**消息组(MessageGroup)**来保证消息的顺序性:
关键点:
- 相同 MessageGroup 的消息会被发送到同一个队列
- 同一队列的消息严格按顺序消费
- 不同 MessageGroup 的消息可以并行处理
发送顺序消息
5.0 SDK 示例:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class FifoMessageProducer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("FifoTopic")
.build();
// 模拟订单状态变更
String orderId = "order_12345";
String[] statuses = {"CREATED", "PAID", "SHIPPED", "DELIVERED"};
for (String status : statuses) {
// 构建顺序消息,设置 MessageGroup
Message message = provider.newMessageBuilder()
.setTopic("FifoTopic")
.setTag("OrderStatus")
.setKeys(orderId)
.setMessageGroup(orderId) // 设置消息组,相同订单的消息进入同一队列
.setBody(("订单 " + orderId + " 状态变更为 " + status).getBytes())
.build();
SendReceipt receipt = producer.send(message);
System.out.println("发送成功: " + receipt.getMessageId() + ", 状态: " + status);
}
producer.close();
}
}
4.x SDK 示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class FifoMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("FifoProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String orderId = "order_12345";
String[] statuses = {"CREATED", "PAID", "SHIPPED", "DELIVERED"};
for (String status : statuses) {
Message msg = new Message(
"FifoTopic",
"OrderStatus",
orderId,
("订单 " + orderId + " 状态变更为 " + status).getBytes()
);
// 使用 MessageQueueSelector 选择队列
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
// 根据订单 ID 选择队列,相同订单进入同一队列
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.println("发送结果: " + result);
}
producer.shutdown();
}
}
消费顺序消息
4.x SDK 示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class FifoMessageConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FifoConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("FifoTopic", "*");
// 使用 MessageListenerOrderly 保证顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("顺序消费消息: %s%n", new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("顺序消费者启动成功");
}
}
创建顺序消息 Topic
# 创建顺序消息类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t FifoTopic -a +message.type=FIFO
# 创建顺序消费组
sh bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g FifoConsumerGroup -o true
顺序消息注意事项
- 单生产者:顺序消息的生产顺序性仅支持单一生产者
- 串行发送:多线程并行发送无法保证顺序
- MessageGroup 设计:尽量细粒度设计,避免热点
延时消息
延时消息在发送到服务端后,需要等待指定时间后才能被消费。
应用场景
- 订单超时取消:订单创建后 30 分钟未支付自动取消
- 定时提醒:预约提醒、生日提醒
- 延时任务:延时执行某些任务
延时级别
RocketMQ 预设了 18 个延时级别:
| 级别 | 延时时间 | 级别 | 延时时间 |
|---|---|---|---|
| 1 | 1秒 | 10 | 6分钟 |
| 2 | 5秒 | 11 | 7分钟 |
| 3 | 10秒 | 12 | 8分钟 |
| 4 | 30秒 | 13 | 9分钟 |
| 5 | 1分钟 | 14 | 10分钟 |
| 6 | 2分钟 | 15 | 20分钟 |
| 7 | 3分钟 | 16 | 30分钟 |
| 8 | 4分钟 | 17 | 1小时 |
| 9 | 5分钟 | 18 | 2小时 |
发送延时消息
5.0 SDK 示例:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.time.Duration;
public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("DelayTopic")
.build();
// 构建延时消息,设置延时时间
Message message = provider.newMessageBuilder()
.setTopic("DelayTopic")
.setTag("OrderTimeout")
.setKeys("order_001")
.setDelayTime(Duration.ofMinutes(30)) // 延时 30 分钟
.setBody("订单超时检查".getBytes())
.build();
SendReceipt receipt = producer.send(message);
System.out.println("延时消息发送成功: " + receipt.getMessageId());
producer.close();
}
}
4.x SDK 示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message(
"DelayTopic",
"OrderTimeout",
"order_001",
"订单超时检查".getBytes()
);
// 设置延时级别 16(30分钟)
msg.setDelayTimeLevel(16);
SendResult result = producer.send(msg);
System.out.println("延时消息发送成功: " + result);
producer.shutdown();
}
}
订单超时取消示例
// 创建订单时发送延时消息
public void createOrder(Order order) {
// 1. 保存订单到数据库
orderMapper.insert(order);
// 2. 发送延时消息(30分钟后检查订单状态)
Message msg = new Message(
"OrderTimeoutTopic",
"TimeoutCheck",
order.getOrderId(),
order.getOrderId().getBytes()
);
msg.setDelayTimeLevel(16); // 30分钟
producer.send(msg);
}
// 消费延时消息,检查订单状态
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String orderId = new String(msg.getBody());
// 查询订单状态
Order order = orderMapper.selectById(orderId);
// 如果订单未支付,取消订单
if ("UNPAID".equals(order.getStatus())) {
order.setStatus("CANCELLED");
orderMapper.updateById(order);
System.out.println("订单超时取消: " + orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
创建延时消息 Topic
# 创建延时消息类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t DelayTopic -a +message.type=DELAY
事务消息
事务消息用于保证分布式场景下消息发送和本地事务的最终一致性。
应用场景
- 支付场景:支付成功后通知订单服务
- 订单场景:订单创建成功后通知库存服务
- 分布式事务:跨服务的业务操作需要保证一致性
事务消息原理
发送事务消息
4.x SDK 示例:
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.*;
public class TransactionMessageProducer {
public static void main(String[] args) throws Exception {
// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 解析消息
String orderId = new String(msg.getBody());
// 执行本地事务:创建订单、扣减库存等
createOrder(orderId);
// 本地事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 事务回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = new String(msg.getBody());
// 查询本地事务状态
Order order = orderMapper.selectById(orderId);
if (order != null) {
// 订单存在,说明本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 订单不存在,说明本地事务失败
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
// 设置事务回查线程池
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(5);
producer.setCheckRequestHoldMax(2000);
producer.start();
// 发送事务消息
Message msg = new Message(
"TransactionTopic",
"OrderCreate",
"order_001",
"order_001".getBytes()
);
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("事务消息发送结果: " + result.getLocalTransactionState());
// 不要立即关闭,等待事务回查
// producer.shutdown();
}
private static void createOrder(String orderId) {
// 模拟创建订单的业务逻辑
System.out.println("创建订单: " + orderId);
}
}
事务消息流程详解
- 发送半消息:消息发送到 Broker,但对消费者不可见
- 执行本地事务:执行业务逻辑,如创建订单
- 提交或回滚:根据本地事务结果提交或回滚消息
- 事务回查:如果 Broker 未收到确认,会定期回查事务状态
创建事务消息 Topic
# 创建事务消息类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t TransactionTopic -a +message.type=TRANSACTION
事务消息注意事项
- 事务回查:必须实现
checkLocalTransaction方法,处理事务状态回查 - 幂等性:本地事务需要保证幂等性,因为可能会被多次调用
- 超时时间:合理设置事务超时时间,避免长时间等待
- 事务状态:事务状态有三种:COMMIT、ROLLBACK、UNKNOWN
// 事务状态说明
public enum LocalTransactionState {
COMMIT_MESSAGE, // 提交消息,消费者可见
ROLLBACK_MESSAGE, // 回滚消息,消费者不可见
UNKNOW // 未知状态,等待事务回查
}
消息类型对比总结
| 特性 | 普通消息 | 顺序消息 | 延时消息 | 事务消息 |
|---|---|---|---|---|
| 顺序保证 | 无 | 队列内有序 | 无 | 无 |
| 延时特性 | 无 | 无 | 支持 | 无 |
| 事务特性 | 无 | 无 | 无 | 支持 |
| 实现复杂度 | 低 | 中 | 低 | 高 |
| 典型场景 | 异步解耦 | 订单处理 | 定时任务 | 分布式事务 |
小结
本章介绍了 RocketMQ 的四种消息类型:
- 普通消息:最基础的消息类型,适用于大多数异步通信场景
- 顺序消息:保证消息按顺序消费,适用于订单处理等场景
- 延时消息:延时一定时间后消费,适用于定时任务场景
- 事务消息:保证分布式事务一致性,适用于跨服务事务场景
每种消息类型都有其特定的应用场景,选择合适的消息类型可以有效解决业务问题。
练习
- 实现一个订单系统,使用顺序消息保证订单状态变更的顺序性
- 使用延时消息实现订单超时自动取消功能
- 使用事务消息实现支付成功后通知订单服务的场景
- 比较四种消息类型的适用场景和注意事项