跳到主要内容

RocketMQ 速查表

本页面汇总了 RocketMQ 开发和运维中最常用的命令和配置,方便快速查阅。

核心概念速查

概念说明
NameServer路由注册中心,维护 Broker 和 Topic 路由信息
Broker消息存储和转发服务器
Topic消息传输和存储的顶层容器
MessageQueue消息存储和传输的实际容器
Producer消息生产者
Consumer消息消费者
ConsumerGroup消费者分组,同一组内消费者共同消费

默认端口

端口服务说明
9876NameServer路由服务端口
10911BrokerBroker 服务端口
10909BrokerHA 端口(主从同步)
8081Proxy5.x gRPC 端口

常用命令

服务管理

# 启动 NameServer
nohup sh bin/mqnamesrv &

# 启动 Broker(Local 模式,含 Proxy)
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

# 启动 Broker(指定配置文件)
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

# 关闭 Broker
sh bin/mqshutdown broker

# 关闭 NameServer
sh bin/mqshutdown namesrv

# 查看进程
ps -ef | grep rocketmq

Topic 管理

# 创建 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t TopicName

# 创建指定类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t FifoTopic -a +message.type=FIFO

# 删除 Topic
sh bin/mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t TopicName

# 查看 Topic 列表
sh bin/mqadmin topicList -n localhost:9876

# 查看 Topic 详情
sh bin/mqadmin topicStatus -n localhost:9876 -t TopicName

# 查看 Topic 路由
sh bin/mqadmin topicRoute -n localhost:9876 -t TopicName

消费组管理

# 创建消费组
sh bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g GroupName

# 创建顺序消费组
sh bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g GroupName -o true

# 查看消费组列表
sh bin/mqadmin consumerList -n localhost:9876

# 查看消费组详情
sh bin/mqadmin consumerProgress -n localhost:9876 -g GroupName

# 查看消费组订阅关系
sh bin/mqadmin consumerConnection -n localhost:9876 -g GroupName

消息查询

# 按 Key 查询消息
sh bin/mqadmin queryMsgByKey -n localhost:9876 -t TopicName -k messageKey

# 按 MessageId 查询消息
sh bin/mqadmin queryMsgById -n localhost:9876 -i messageId

# 查看消息轨迹
sh bin/mqadmin queryMsgTrace -n localhost:9876 -i messageId

消费进度管理

# 查看消费进度
sh bin/mqadmin consumerProgress -n localhost:9876 -g GroupName

# 重置消费进度到最新
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g GroupName -t TopicName -s -1

# 重置消费进度到指定时间
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g GroupName -t TopicName -s "20240101000000"

集群管理

# 查看集群信息
sh bin/mqadmin clusterList -n localhost:9876

# 查看 Broker 信息
sh bin/mqadmin brokerStatus -n localhost:9876 -b brokerName

# 查看集群统计
sh bin/mqadmin statsAll -n localhost:9876

代码速查

Maven 依赖

<!-- 5.0 gRPC 协议 SDK(推荐) -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>

<!-- 4.x Remoting 协议 SDK -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>

发送消息(4.x SDK)

// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 创建消息
Message msg = new Message("Topic", "Tag", "Key", "Body".getBytes());

// 同步发送
SendResult result = producer.send(msg);

// 异步发送
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult result) {}
public void onException(Throwable e) {}
});

// 单向发送
producer.sendOneway(msg);

// 关闭生产者
producer.shutdown();

消费消息(4.x SDK)

// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");

// 订阅 Topic
consumer.subscribe("Topic", "TagA||TagB");

// 并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
return ConsumeOrderlyStatus.SUCCESS;
}
});

// 启动消费者
consumer.start();

发送消息(5.0 SDK)

ClientServiceProvider provider = ClientServiceProvider.loadService();

ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("Topic")
.build();

Message message = provider.newMessageBuilder()
.setTopic("Topic")
.setTag("Tag")
.setKeys("Key")
.setBody("Body".getBytes())
.build();

SendReceipt receipt = producer.send(message);

消费消息(5.0 SDK)

ClientServiceProvider provider = ClientServiceProvider.loadService();

ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

PushConsumer consumer = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("ConsumerGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"Topic",
new FilterExpression("*", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
return ConsumeResult.SUCCESS;
})
.build();

消息类型速查

消息类型标识创建命令
普通消息NORMAL-a +message.type=NORMAL
顺序消息FIFO-a +message.type=FIFO
延时消息DELAY-a +message.type=DELAY
事务消息TRANSACTION-a +message.type=TRANSACTION

延时级别

级别延时级别延时
11秒106分钟
25秒117分钟
310秒128分钟
430秒139分钟
51分钟1410分钟
62分钟1520分钟
73分钟1630分钟
84分钟171小时
95分钟182小时

生产者配置

参数默认值说明
sendMsgTimeout3000发送超时(毫秒)
retryTimesWhenSendFailed2同步发送重试次数
retryTimesWhenSendAsyncFailed2异步发送重试次数
maxMessageSize4MB最大消息大小
compressMsgBodyOverHowmuch4096超过此大小压缩

消费者配置

参数默认值说明
consumeThreadMin20最小消费线程数
consumeThreadMax20最大消费线程数
pullBatchSize32每次拉取消息数
consumeMessageBatchMaxSize1每次消费消息数
consumeTimeout15消费超时(分钟)
maxReconsumeTimes16最大重试次数

Broker 配置

参数默认值说明
brokerNamebroker-aBroker 名称
brokerClusterNameDefaultCluster集群名称
brokerId0Broker ID(0=Master)
namesrvAddr-NameServer 地址
listenPort10911监听端口
storePathRootDir用户目录/store存储根目录
storePathCommitLogstorePathRootDir/commitlogCommitLog 目录
flushDiskTypeASYNC_FLUSH刷盘方式
brokerRoleASYNC_MASTERBroker 角色

刷盘方式

类型说明性能可靠性
ASYNC_FLUSH异步刷盘
SYNC_FLUSH同步刷盘

Broker 角色

角色说明
ASYNC_MASTER异步复制 Master
SYNC_MASTER同步复制 Master
SLAVE从节点

消息发送状态

状态说明
SEND_OK发送成功
FLUSH_DISK_TIMEOUT刷盘超时
FLUSH_SLAVE_TIMEOUT同步到 Slave 超时
SLAVE_NOT_AVAILABLESlave 不可用

消费结果

结果说明
CONSUME_SUCCESS消费成功
RECONSUME_LATER消费失败,稍后重试

死信队列

# 死信队列命名规则
%DLQ% + ConsumerGroupName

# 查看死信队列消息
sh bin/mqadmin queryMsgByKey -t "%DLQ%ConsumerGroup" -k messageKey

常见问题排查

消息发送失败

# 1. 检查 NameServer 连接
sh bin/mqadmin clusterList -n localhost:9876

# 2. 检查 Topic 是否存在
sh bin/mqadmin topicStatus -n localhost:9876 -t TopicName

# 3. 检查 Broker 状态
sh bin/mqadmin brokerStatus -n localhost:9876 -b broker-a

消息堆积

# 1. 查看消费进度
sh bin/mqadmin consumerProgress -n localhost:9876 -g GroupName

# 2. 查看消费者连接
sh bin/mqadmin consumerConnection -n localhost:9876 -g GroupName

# 3. 重置消费进度
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g GroupName -t TopicName -s -1

Broker 无法启动

# 检查端口占用
netstat -tlnp | grep 10911

# 检查日志
tail -f ~/logs/rocketmqlogs/broker.log

# 检查配置文件
cat conf/broker.conf

ACL 配置

开启 ACL

# broker.conf
aclEnable = true

ACL 配置文件(plain_acl.yml)

# 全局白名单
globalWhiteRemoteAddresses:
- 127.0.0.1
- 192.168.0.*

# 账户配置
accounts:
- accessKey: admin
secretKey: 12345678
admin: true
- accessKey: producer
secretKey: producer_pass
topicPerms:
- TopicA=PUB
- accessKey: consumer
secretKey: consumer_pass
topicPerms:
- TopicA=SUB
groupPerms:
- GroupA=SUB

客户端 ACL 配置

DefaultMQProducer producer = new DefaultMQProducer(
"ProducerGroup",
new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey"))
);

权限定义

权限说明
DENY拒绝
PUB发送权限
SUB订阅权限
PUB|SUB发送+订阅权限

JVM 调优

Broker JVM 配置(runbroker.sh)

# 推荐配置(16GB 内存机器)
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

# GC 配置
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=200"

# 直接内存
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=4g"

NameServer JVM 配置(runserver.sh)

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"

JVM 参数说明

参数说明
-Xms堆内存初始大小
-Xmx堆内存最大值
-Xmn年轻代大小
-XX:MaxDirectMemorySize直接内存最大值
-XX:+UseG1GC使用 G1 垃圾收集器

操作系统调优

内核参数(/etc/sysctl.conf)

# 网络参数
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_max_syn_backlog = 65536

# 文件描述符
fs.file-max = 1000000

# 内存参数
vm.swappiness = 10

文件描述符限制(/etc/security/limits.conf)

* soft nofile 65536
* hard nofile 65536

消息轨迹

开启消息轨迹

# broker.conf
traceTopicEnable = true

客户端开启轨迹

// 生产者
DefaultMQProducer producer = new DefaultMQProducer("Group", true);

// 消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group", true);

查询消息轨迹

sh bin/mqadmin QueryMsgTraceById -n localhost:9876 -i <messageId>

监控指标

核心 Broker 指标

指标说明
rocketmq_broker_tps生产 TPS
rocketmq_broker_qps消费 QPS
rocketmq_brokeruntime_commitlog_disk_ratio磁盘使用率
rocketmq_brokeruntime_putmessage_entire_time_max写入最大耗时

核心消费者指标

指标说明
rocketmq_group_diff消息堆积数
rocketmq_group_retrydiff重试队列堆积
rocketmq_client_consume_fail_msg_tps消费失败 TPS

告警规则参考

# 消息堆积告警
- alert: RocketMQMessageAccumulation
expr: rocketmq_group_diff > 10000
for: 5m

# 磁盘使用率告警
- alert: RocketMQDiskUsageHigh
expr: rocketmq_brokeruntime_commitlog_disk_ratio > 0.8
for: 5m

# Broker 宕机告警
- alert: RocketMQBrokerDown
expr: up{job="rocketmq"} == 0
for: 1m

Docker 部署

version: '3'
services:
namesrv:
image: apache/rocketmq:5.3.2
container_name: rmqnamesrv
ports:
- 9876:9876
command: sh mqnamesrv

broker:
image: apache/rocketmq:5.3.2
container_name: rmqbroker
ports:
- 10911:10911
- 8081:8081
environment:
- NAMESRV_ADDR=namesrv:9876
command: sh mqbroker -n namesrv:9876 --enable-proxy
depends_on:
- namesrv

dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rmqdashboard
ports:
- 8180:8080
environment:
- rocketmq.config.namesrvAddr=namesrv:9876
depends_on:
- namesrv

exporter:
image: apache/rocketmq-exporter:latest
container_name: rmqexporter
ports:
- 5557:5557
environment:
- ROCKETMQ_CONFIG_NAMESRVADDR=namesrv:9876
depends_on:
- namesrv

小结

本速查表涵盖了 RocketMQ 开发和运维中最常用的命令和配置,建议收藏以备查阅。