跳到主要内容

消息类型

RocketMQ 支持多种消息类型,满足不同的业务场景需求。本章将详细介绍普通消息、顺序消息、延时消息和事务消息的使用方法。

消息类型概述

RocketMQ 5.0 开始支持消息类型强制校验,每个 Topic 只允许发送一种类型的消息:

消息类型标识说明适用场景
普通消息NORMAL最基础的消息类型异步解耦、流量削峰
顺序消息FIFO保证消息的顺序性订单处理、数据同步
延时消息DELAY延迟一定时间后消费定时任务、延时触发
事务消息TRANSACTION保证分布式事务一致性支付、订单

普通消息

普通消息是最基础的消息类型,适用于大多数异步通信场景。

应用场景

  • 异步解耦:将非核心业务异步化处理
  • 流量削峰:应对突发流量,保护下游系统
  • 事件通知:发送通知、短信等

发送普通消息

5.0 SDK 示例

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

public class NormalMessageProducer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("NormalTopic")
.build();

// 构建普通消息
Message message = provider.newMessageBuilder()
.setTopic("NormalTopic")
.setTag("TagA")
.setKeys("order_001")
.setBody("订单创建消息".getBytes())
.build();

// 发送消息
SendReceipt receipt = producer.send(message);
System.out.println("消息发送成功: " + receipt.getMessageId());

producer.close();
}
}

4.x SDK 示例

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class NormalMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("NormalProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 创建普通消息
Message msg = new Message(
"NormalTopic", // Topic
"TagA", // Tag
"order_001", // Keys
"订单创建消息".getBytes() // Body
);

// 发送消息
SendResult result = producer.send(msg);
System.out.println("发送结果: " + result);

producer.shutdown();
}
}

创建普通消息 Topic

# 创建普通消息类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t NormalTopic -a +message.type=NORMAL

顺序消息

顺序消息保证消息按照发送顺序被消费,适用于需要严格顺序处理的业务场景。

应用场景

  • 订单处理:订单创建、支付、发货需要按顺序处理
  • 数据同步:数据库 binlog 同步需要按顺序执行
  • 交易撮合:相同价格的交易单按先后顺序处理

顺序消息原理

RocketMQ 通过**消息组(MessageGroup)**来保证消息的顺序性:

关键点

  • 相同 MessageGroup 的消息会被发送到同一个队列
  • 同一队列的消息严格按顺序消费
  • 不同 MessageGroup 的消息可以并行处理

发送顺序消息

5.0 SDK 示例

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

public class FifoMessageProducer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("FifoTopic")
.build();

// 模拟订单状态变更
String orderId = "order_12345";
String[] statuses = {"CREATED", "PAID", "SHIPPED", "DELIVERED"};

for (String status : statuses) {
// 构建顺序消息,设置 MessageGroup
Message message = provider.newMessageBuilder()
.setTopic("FifoTopic")
.setTag("OrderStatus")
.setKeys(orderId)
.setMessageGroup(orderId) // 设置消息组,相同订单的消息进入同一队列
.setBody(("订单 " + orderId + " 状态变更为 " + status).getBytes())
.build();

SendReceipt receipt = producer.send(message);
System.out.println("发送成功: " + receipt.getMessageId() + ", 状态: " + status);
}

producer.close();
}
}

4.x SDK 示例

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;

public class FifoMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("FifoProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

String orderId = "order_12345";
String[] statuses = {"CREATED", "PAID", "SHIPPED", "DELIVERED"};

for (String status : statuses) {
Message msg = new Message(
"FifoTopic",
"OrderStatus",
orderId,
("订单 " + orderId + " 状态变更为 " + status).getBytes()
);

// 使用 MessageQueueSelector 选择队列
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
// 根据订单 ID 选择队列,相同订单进入同一队列
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId);

System.out.println("发送结果: " + result);
}

producer.shutdown();
}
}

消费顺序消息

4.x SDK 示例

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class FifoMessageConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FifoConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("FifoTopic", "*");

// 使用 MessageListenerOrderly 保证顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("顺序消费消息: %s%n", new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();
System.out.println("顺序消费者启动成功");
}
}

创建顺序消息 Topic

# 创建顺序消息类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t FifoTopic -a +message.type=FIFO

# 创建顺序消费组
sh bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g FifoConsumerGroup -o true

顺序消息注意事项

  1. 单生产者:顺序消息的生产顺序性仅支持单一生产者
  2. 串行发送:多线程并行发送无法保证顺序
  3. MessageGroup 设计:尽量细粒度设计,避免热点

延时消息

延时消息在发送到服务端后,需要等待指定时间后才能被消费。

应用场景

  • 订单超时取消:订单创建后 30 分钟未支付自动取消
  • 定时提醒:预约提醒、生日提醒
  • 延时任务:延时执行某些任务

延时级别

RocketMQ 预设了 18 个延时级别:

级别延时时间级别延时时间
11秒106分钟
25秒117分钟
310秒128分钟
430秒139分钟
51分钟1410分钟
62分钟1520分钟
73分钟1630分钟
84分钟171小时
95分钟182小时

发送延时消息

5.0 SDK 示例

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.time.Duration;

public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("DelayTopic")
.build();

// 构建延时消息,设置延时时间
Message message = provider.newMessageBuilder()
.setTopic("DelayTopic")
.setTag("OrderTimeout")
.setKeys("order_001")
.setDelayTime(Duration.ofMinutes(30)) // 延时 30 分钟
.setBody("订单超时检查".getBytes())
.build();

SendReceipt receipt = producer.send(message);
System.out.println("延时消息发送成功: " + receipt.getMessageId());

producer.close();
}
}

4.x SDK 示例

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message(
"DelayTopic",
"OrderTimeout",
"order_001",
"订单超时检查".getBytes()
);

// 设置延时级别 16(30分钟)
msg.setDelayTimeLevel(16);

SendResult result = producer.send(msg);
System.out.println("延时消息发送成功: " + result);

producer.shutdown();
}
}

订单超时取消示例

// 创建订单时发送延时消息
public void createOrder(Order order) {
// 1. 保存订单到数据库
orderMapper.insert(order);

// 2. 发送延时消息(30分钟后检查订单状态)
Message msg = new Message(
"OrderTimeoutTopic",
"TimeoutCheck",
order.getOrderId(),
order.getOrderId().getBytes()
);
msg.setDelayTimeLevel(16); // 30分钟

producer.send(msg);
}

// 消费延时消息,检查订单状态
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String orderId = new String(msg.getBody());

// 查询订单状态
Order order = orderMapper.selectById(orderId);

// 如果订单未支付,取消订单
if ("UNPAID".equals(order.getStatus())) {
order.setStatus("CANCELLED");
orderMapper.updateById(order);
System.out.println("订单超时取消: " + orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

创建延时消息 Topic

# 创建延时消息类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t DelayTopic -a +message.type=DELAY

事务消息

事务消息用于保证分布式场景下消息发送和本地事务的最终一致性。

应用场景

  • 支付场景:支付成功后通知订单服务
  • 订单场景:订单创建成功后通知库存服务
  • 分布式事务:跨服务的业务操作需要保证一致性

事务消息原理

发送事务消息

4.x SDK 示例

import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.*;

public class TransactionMessageProducer {
public static void main(String[] args) throws Exception {
// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
producer.setNamesrvAddr("localhost:9876");

// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 解析消息
String orderId = new String(msg.getBody());

// 执行本地事务:创建订单、扣减库存等
createOrder(orderId);

// 本地事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

// 事务回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = new String(msg.getBody());

// 查询本地事务状态
Order order = orderMapper.selectById(orderId);
if (order != null) {
// 订单存在,说明本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 订单不存在,说明本地事务失败
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});

// 设置事务回查线程池
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(5);
producer.setCheckRequestHoldMax(2000);

producer.start();

// 发送事务消息
Message msg = new Message(
"TransactionTopic",
"OrderCreate",
"order_001",
"order_001".getBytes()
);

TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("事务消息发送结果: " + result.getLocalTransactionState());

// 不要立即关闭,等待事务回查
// producer.shutdown();
}

private static void createOrder(String orderId) {
// 模拟创建订单的业务逻辑
System.out.println("创建订单: " + orderId);
}
}

事务消息流程详解

  1. 发送半消息:消息发送到 Broker,但对消费者不可见
  2. 执行本地事务:执行业务逻辑,如创建订单
  3. 提交或回滚:根据本地事务结果提交或回滚消息
  4. 事务回查:如果 Broker 未收到确认,会定期回查事务状态

创建事务消息 Topic

# 创建事务消息类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t TransactionTopic -a +message.type=TRANSACTION

事务消息注意事项

  1. 事务回查:必须实现 checkLocalTransaction 方法,处理事务状态回查
  2. 幂等性:本地事务需要保证幂等性,因为可能会被多次调用
  3. 超时时间:合理设置事务超时时间,避免长时间等待
  4. 事务状态:事务状态有三种:COMMIT、ROLLBACK、UNKNOWN
// 事务状态说明
public enum LocalTransactionState {
COMMIT_MESSAGE, // 提交消息,消费者可见
ROLLBACK_MESSAGE, // 回滚消息,消费者不可见
UNKNOW // 未知状态,等待事务回查
}

消息类型对比总结

特性普通消息顺序消息延时消息事务消息
顺序保证队列内有序
延时特性支持
事务特性支持
实现复杂度
典型场景异步解耦订单处理定时任务分布式事务

小结

本章介绍了 RocketMQ 的四种消息类型:

  1. 普通消息:最基础的消息类型,适用于大多数异步通信场景
  2. 顺序消息:保证消息按顺序消费,适用于订单处理等场景
  3. 延时消息:延时一定时间后消费,适用于定时任务场景
  4. 事务消息:保证分布式事务一致性,适用于跨服务事务场景

每种消息类型都有其特定的应用场景,选择合适的消息类型可以有效解决业务问题。

练习

  1. 实现一个订单系统,使用顺序消息保证订单状态变更的顺序性
  2. 使用延时消息实现订单超时自动取消功能
  3. 使用事务消息实现支付成功后通知订单服务的场景
  4. 比较四种消息类型的适用场景和注意事项

延伸阅读