跳到主要内容

消息轨迹

消息轨迹是 RocketMQ 提供的消息链路追踪功能,可以记录消息从生产到消费的完整生命周期,帮助开发者定位消息丢失、消费延迟等问题。

轨迹数据概述

消息轨迹记录了消息在三个阶段的关键信息:

各阶段记录的数据

阶段记录的信息
Producer 端生产实例信息、发送时间、发送结果、发送耗时
Broker 端Topic、消息存储位置、消息 Key、消息 Tag
Consumer 端消费实例信息、投递时间、投递轮次、消费结果、消费耗时

开启消息轨迹

Broker 端配置

conf/broker.conf 中添加:

# 开启消息轨迹功能
traceTopicEnable = true

重启 Broker:

sh bin/mqshutdown broker
nohup sh bin/mqbroker -c conf/broker.conf &

开启后,Broker 会自动创建系统级的 TraceTopic:RMQ_SYS_TRACE_TOPIC

生产者开启轨迹

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class TraceProducer {
public static void main(String[] args) throws Exception {
// 第二个参数 true 表示开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer(
"TraceProducerGroup",
true // enableMsgTrace
);

producer.setNamesrvAddr("localhost:9876");
producer.start();

// 发送消息
Message msg = new Message(
"TopicTest",
"TagA",
"OrderID001",
"Hello RocketMQ".getBytes()
);

SendResult result = producer.send(msg);
System.out.println("发送结果: " + result);

producer.shutdown();
}
}

消费者开启轨迹

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TraceConsumer {
public static void main(String[] args) throws Exception {
// 第二个参数 true 表示开启消息轨迹
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"TraceConsumerGroup",
true // enableMsgTrace
);

consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到消息: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
System.out.println("消费者启动成功");
}
}

自定义 TraceTopic

默认情况下,轨迹数据存储在系统级的 RMQ_SYS_TRACE_TOPIC 中。也可以自定义 TraceTopic 进行存储隔离。

创建自定义 TraceTopic

# 创建用于存储轨迹数据的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t MyTraceTopic

使用自定义 TraceTopic

// 生产者使用自定义 TraceTopic
DefaultMQProducer producer = new DefaultMQProducer(
"TraceProducerGroup",
true, // enableMsgTrace
"MyTraceTopic" // customizedTraceTopic
);

// 消费者使用自定义 TraceTopic
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"TraceConsumerGroup",
true, // enableMsgTrace
"MyTraceTopic" // customizedTraceTopic
);

查询消息轨迹

命令行查询

# 按消息 ID 查询轨迹
sh bin/mqadmin QueryMsgTraceById -n localhost:9876 -i <messageId>

# 输出示例
# Trace: Pub, time: 2024-01-01 10:00:00, cost: 5ms, status: SUCCESS
# Trace: SubBefore, time: 2024-01-01 10:00:01, group: ConsumerGroup
# Trace: SubAfter, time: 2024-01-01 10:00:01, cost: 2ms, status: SUCCESS

Dashboard 查询

通过 RocketMQ Dashboard 可以可视化查看消息轨迹:

  1. 打开 Dashboard,进入「消息轨迹」页面
  2. 输入消息 ID 或 Key
  3. 查看完整的消息链路

轨迹数据结构

消息轨迹数据以 JSON 格式存储,包含以下字段:

生产者轨迹

{
"traceType": "Pub",
"timeStamp": 1704067200000,
"regionId": "DefaultRegion",
"groupName": "ProducerGroup",
"costTime": 5,
"msgType": "Normal_Msg",
"msgId": "AC11000100002A9F0000000000000000",
"topic": "TopicTest",
"tags": "TagA",
"keys": "OrderID001",
"bornHost": "192.168.1.100:12345",
"storeHost": "192.168.1.101:10911",
"queueId": 0,
"offsetId": 0,
"retryTimes": 0,
"status": "SUCCESS"
}

消费者轨迹(消费前)

{
"traceType": "SubBefore",
"timeStamp": 1704067201000,
"regionId": "DefaultRegion",
"groupName": "ConsumerGroup",
"msgId": "AC11000100002A9F0000000000000000",
"retryTimes": 0,
"requestId": "req_001"
}

消费者轨迹(消费后)

{
"traceType": "SubAfter",
"timeStamp": 1704067201000,
"regionId": "DefaultRegion",
"groupName": "ConsumerGroup",
"msgId": "AC11000100002A9F0000000000000000",
"costTime": 2,
"status": "SUCCESS",
"retryTimes": 0
}

字段说明

字段说明
traceType轨迹类型:Pub(发送)、SubBefore(消费前)、SubAfter(消费后)
timeStamp时间戳
groupName生产者/消费者组名
costTime耗时(毫秒)
msgType消息类型:Normal_Msg、Trans_Msg 等
msgId消息 ID
topicTopic 名称
tagsTag
keys消息 Key
bornHost消息产生的地址
storeHost消息存储的地址
status状态:SUCCESS、FAILED

部署模式

普通模式

所有 Broker 节点都存储轨迹数据,适合小型集群。

IO 隔离模式

专门的 Broker 节点存储轨迹数据,适合高吞吐场景。

配置方式

# 专门用于存储轨迹的 Broker 配置
brokerClusterName = TraceCluster
brokerName = broker-trace
traceTopicEnable = true

性能影响

开启消息轨迹会对性能产生一定影响:

场景TPS 影响延迟影响
关闭轨迹无影响无影响
开启轨迹(默认)下降 10-15%增加 5-10ms
开启轨迹(隔离部署)下降 5-8%增加 2-5ms

优化建议

  • 高吞吐场景使用 IO 隔离模式
  • 仅对关键消息开启轨迹
  • 生产环境评估性能影响后再决定是否开启

问题排查案例

案例 1:消息丢失排查

问题:消息发送成功但消费者未收到

排查步骤

# 1. 查询消息轨迹
sh bin/mqadmin QueryMsgTraceById -n localhost:9876 -i <messageId>

# 2. 分析轨迹数据
# - 检查 Pub 记录:确认消息已发送成功
# - 检查 SubBefore 记录:确认消息已投递
# - 检查 SubAfter 记录:确认消费结果

# 3. 根据轨迹定位问题
# - 如果没有 SubBefore:可能是订阅关系错误
# - 如果有 SubBefore 但没有 SubAfter:可能是消费异常
# - 如果 SubAfter 状态为 FAILED:查看消费失败原因

案例 2:消费延迟排查

问题:消息消费延迟较大

排查步骤

# 1. 查询消息轨迹
sh bin/mqadmin QueryMsgTraceById -n localhost:9876 -i <messageId>

# 2. 分析耗时
# - costTime(发送耗时):正常应 < 10ms
# - costTime(消费耗时):正常应 < 100ms

# 3. 定位瓶颈
# - 发送耗时高:检查网络、Broker 压力
# - 消费耗时高:检查消费逻辑、下游系统

案例 3:重复消费排查

问题:消息被重复消费

排查步骤

# 1. 查询消息轨迹
sh bin/mqadmin QueryMsgTraceById -n localhost:9876 -i <messageId>

# 2. 检查消费记录
# - 多条 SubAfter 记录:说明被多次消费
# - 检查 status 是否为 SUCCESS

# 3. 分析原因
# - 消费成功但未及时提交位点
# - 消费者重启导致位点回滚
# - 网络抖动导致重复投递

小结

本章介绍了 RocketMQ 的消息轨迹功能:

  1. 轨迹数据:记录生产、存储、消费三个阶段的信息
  2. 开启方式:Broker 配置 traceTopicEnable=true,客户端设置 enableMsgTrace=true
  3. 自定义 Topic:可以使用自定义 Topic 存储轨迹数据
  4. 查询方式:命令行或 Dashboard 查询
  5. 性能影响:开启轨迹会降低 10-15% 的 TPS
  6. 问题排查:通过轨迹数据定位消息丢失、延迟、重复消费等问题

消息轨迹是问题排查的重要工具,建议在关键业务场景开启此功能。

延伸阅读