RocketMQ 速查表
本页面汇总了 RocketMQ 开发和运维中最常用的命令和配置,方便快速查阅。
核心概念速查
| 概念 | 说明 |
|---|---|
| NameServer | 路由注册中心,维护 Broker 和 Topic 路由信息 |
| Broker | 消息存储和转发服务器 |
| Topic | 消息传输和存储的顶层容器 |
| MessageQueue | 消息存储和传输的实际容器 |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| ConsumerGroup | 消费者分组,同一组内消费者共同消费 |
默认端口
| 端口 | 服务 | 说明 |
|---|---|---|
| 9876 | NameServer | 路由服务端口 |
| 10911 | Broker | Broker 服务端口 |
| 10909 | Broker | HA 端口(主从同步) |
| 8081 | Proxy | 5.x gRPC 端口 |
常用命令
服务管理
# 启动 NameServer
nohup sh bin/mqnamesrv &
# 启动 Broker(Local 模式,含 Proxy)
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
# 启动 Broker(指定配置文件)
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
# 关闭 Broker
sh bin/mqshutdown broker
# 关闭 NameServer
sh bin/mqshutdown namesrv
# 查看进程
ps -ef | grep rocketmq
Topic 管理
# 创建 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t TopicName
# 创建指定类型的 Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t FifoTopic -a +message.type=FIFO
# 删除 Topic
sh bin/mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t TopicName
# 查看 Topic 列表
sh bin/mqadmin topicList -n localhost:9876
# 查看 Topic 详情
sh bin/mqadmin topicStatus -n localhost:9876 -t TopicName
# 查看 Topic 路由
sh bin/mqadmin topicRoute -n localhost:9876 -t TopicName
消费组管理
# 创建消费组
sh bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g GroupName
# 创建顺序消费组
sh bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g GroupName -o true
# 查看消费组列表
sh bin/mqadmin consumerList -n localhost:9876
# 查看消费组详情
sh bin/mqadmin consumerProgress -n localhost:9876 -g GroupName
# 查看消费组订阅关系
sh bin/mqadmin consumerConnection -n localhost:9876 -g GroupName
消息查询
# 按 Key 查询消息
sh bin/mqadmin queryMsgByKey -n localhost:9876 -t TopicName -k messageKey
# 按 MessageId 查询消息
sh bin/mqadmin queryMsgById -n localhost:9876 -i messageId
# 查看消息轨迹
sh bin/mqadmin queryMsgTrace -n localhost:9876 -i messageId
消费进度管理
# 查看消费进度
sh bin/mqadmin consumerProgress -n localhost:9876 -g GroupName
# 重置消费进度到最新
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g GroupName -t TopicName -s -1
# 重置消费进度到指定时间
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g GroupName -t TopicName -s "20240101000000"
集群管理
# 查看集群信息
sh bin/mqadmin clusterList -n localhost:9876
# 查看 Broker 信息
sh bin/mqadmin brokerStatus -n localhost:9876 -b brokerName
# 查看集群统计
sh bin/mqadmin statsAll -n localhost:9876
代码速查
Maven 依赖
<!-- 5.0 gRPC 协议 SDK(推荐) -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
<!-- 4.x Remoting 协议 SDK -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
发送消息(4.x SDK)
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 创建消息
Message msg = new Message("Topic", "Tag", "Key", "Body".getBytes());
// 同步发送
SendResult result = producer.send(msg);
// 异步发送
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult result) {}
public void onException(Throwable e) {}
});
// 单向发送
producer.sendOneway(msg);
// 关闭生产者
producer.shutdown();
消费消息(4.x SDK)
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 订阅 Topic
consumer.subscribe("Topic", "TagA||TagB");
// 并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者
consumer.start();
发送消息(5.0 SDK)
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("Topic")
.build();
Message message = provider.newMessageBuilder()
.setTopic("Topic")
.setTag("Tag")
.setKeys("Key")
.setBody("Body".getBytes())
.build();
SendReceipt receipt = producer.send(message);
消费消息(5.0 SDK)
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
PushConsumer consumer = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("ConsumerGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"Topic",
new FilterExpression("*", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
return ConsumeResult.SUCCESS;
})
.build();
消息类型速查
| 消息类型 | 标识 | 创建命令 |
|---|---|---|
| 普通消息 | NORMAL | -a +message.type=NORMAL |
| 顺序消息 | FIFO | -a +message.type=FIFO |
| 延时消息 | DELAY | -a +message.type=DELAY |
| 事务消息 | TRANSACTION | -a +message.type=TRANSACTION |
延时级别
| 级别 | 延时 | 级别 | 延时 |
|---|---|---|---|
| 1 | 1秒 | 10 | 6分钟 |
| 2 | 5秒 | 11 | 7分钟 |
| 3 | 10秒 | 12 | 8分钟 |
| 4 | 30秒 | 13 | 9分钟 |
| 5 | 1分钟 | 14 | 10分钟 |
| 6 | 2分钟 | 15 | 20分钟 |
| 7 | 3分钟 | 16 | 30分钟 |
| 8 | 4分钟 | 17 | 1小时 |
| 9 | 5分钟 | 18 | 2小时 |
生产者配置
| 参数 | 默认值 | 说明 |
|---|---|---|
sendMsgTimeout | 3000 | 发送超时(毫秒) |
retryTimesWhenSendFailed | 2 | 同步发送重试次数 |
retryTimesWhenSendAsyncFailed | 2 | 异步发送重试次数 |
maxMessageSize | 4MB | 最大消息大小 |
compressMsgBodyOverHowmuch | 4096 | 超过此大小压缩 |
消费者配置
| 参数 | 默认值 | 说明 |
|---|---|---|
consumeThreadMin | 20 | 最小消费线程数 |
consumeThreadMax | 20 | 最大消费线程数 |
pullBatchSize | 32 | 每次拉取消息数 |
consumeMessageBatchMaxSize | 1 | 每次消费消息数 |
consumeTimeout | 15 | 消费超时(分钟) |
maxReconsumeTimes | 16 | 最大重试次数 |
Broker 配置
| 参数 | 默认值 | 说明 |
|---|---|---|
brokerName | broker-a | Broker 名称 |
brokerClusterName | DefaultCluster | 集群名称 |
brokerId | 0 | Broker ID(0=Master) |
namesrvAddr | - | NameServer 地址 |
listenPort | 10911 | 监听端口 |
storePathRootDir | 用户目录/store | 存储根目录 |
storePathCommitLog | storePathRootDir/commitlog | CommitLog 目录 |
flushDiskType | ASYNC_FLUSH | 刷盘方式 |
brokerRole | ASYNC_MASTER | Broker 角色 |
刷盘方式
| 类型 | 说明 | 性能 | 可靠性 |
|---|---|---|---|
ASYNC_FLUSH | 异步刷盘 | 高 | 低 |
SYNC_FLUSH | 同步刷盘 | 低 | 高 |
Broker 角色
| 角色 | 说明 |
|---|---|
ASYNC_MASTER | 异步复制 Master |
SYNC_MASTER | 同步复制 Master |
SLAVE | 从节点 |
消息发送状态
| 状态 | 说明 |
|---|---|
SEND_OK | 发送成功 |
FLUSH_DISK_TIMEOUT | 刷盘超时 |
FLUSH_SLAVE_TIMEOUT | 同步到 Slave 超时 |
SLAVE_NOT_AVAILABLE | Slave 不可用 |
消费结果
| 结果 | 说明 |
|---|---|
CONSUME_SUCCESS | 消费成功 |
RECONSUME_LATER | 消费失败,稍后重试 |
死信队列
# 死信队列命名规则
%DLQ% + ConsumerGroupName
# 查看死信队列消息
sh bin/mqadmin queryMsgByKey -t "%DLQ%ConsumerGroup" -k messageKey
常见问题排查
消息发送失败
# 1. 检查 NameServer 连接
sh bin/mqadmin clusterList -n localhost:9876
# 2. 检查 Topic 是否存在
sh bin/mqadmin topicStatus -n localhost:9876 -t TopicName
# 3. 检查 Broker 状态
sh bin/mqadmin brokerStatus -n localhost:9876 -b broker-a
消息堆积
# 1. 查看消费进度
sh bin/mqadmin consumerProgress -n localhost:9876 -g GroupName
# 2. 查看消费者连接
sh bin/mqadmin consumerConnection -n localhost:9876 -g GroupName
# 3. 重置消费进度
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g GroupName -t TopicName -s -1
Broker 无法启动
# 检查端口占用
netstat -tlnp | grep 10911
# 检查日志
tail -f ~/logs/rocketmqlogs/broker.log
# 检查配置文件
cat conf/broker.conf
ACL 配置
开启 ACL
# broker.conf
aclEnable = true
ACL 配置文件(plain_acl.yml)
# 全局白名单
globalWhiteRemoteAddresses:
- 127.0.0.1
- 192.168.0.*
# 账户配置
accounts:
- accessKey: admin
secretKey: 12345678
admin: true
- accessKey: producer
secretKey: producer_pass
topicPerms:
- TopicA=PUB
- accessKey: consumer
secretKey: consumer_pass
topicPerms:
- TopicA=SUB
groupPerms:
- GroupA=SUB
客户端 ACL 配置
DefaultMQProducer producer = new DefaultMQProducer(
"ProducerGroup",
new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey"))
);
权限定义
| 权限 | 说明 |
|---|---|
| DENY | 拒绝 |
| PUB | 发送权限 |
| SUB | 订阅权限 |
| PUB|SUB | 发送+订阅权限 |
JVM 调优
Broker JVM 配置(runbroker.sh)
# 推荐配置(16GB 内存机器)
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# GC 配置
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=200"
# 直接内存
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=4g"
NameServer JVM 配置(runserver.sh)
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JVM 参数说明
| 参数 | 说明 |
|---|---|
-Xms | 堆内存初始大小 |
-Xmx | 堆内存最大值 |
-Xmn | 年轻代大小 |
-XX:MaxDirectMemorySize | 直接内存最大值 |
-XX:+UseG1GC | 使用 G1 垃圾收集器 |
操作系统调优
内核参数(/etc/sysctl.conf)
# 网络参数
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_max_syn_backlog = 65536
# 文件描述符
fs.file-max = 1000000
# 内存参数
vm.swappiness = 10
文件描述符限制(/etc/security/limits.conf)
* soft nofile 65536
* hard nofile 65536
消息轨迹
开启消息轨迹
# broker.conf
traceTopicEnable = true
客户端开启轨迹
// 生产者
DefaultMQProducer producer = new DefaultMQProducer("Group", true);
// 消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group", true);
查询消息轨迹
sh bin/mqadmin QueryMsgTraceById -n localhost:9876 -i <messageId>
监控指标
核心 Broker 指标
| 指标 | 说明 |
|---|---|
rocketmq_broker_tps | 生产 TPS |
rocketmq_broker_qps | 消费 QPS |
rocketmq_brokeruntime_commitlog_disk_ratio | 磁盘使用率 |
rocketmq_brokeruntime_putmessage_entire_time_max | 写入最大耗时 |
核心消费者指标
| 指标 | 说明 |
|---|---|
rocketmq_group_diff | 消息堆积数 |
rocketmq_group_retrydiff | 重试队列堆积 |
rocketmq_client_consume_fail_msg_tps | 消费失败 TPS |
告警规则参考
# 消息堆积告警
- alert: RocketMQMessageAccumulation
expr: rocketmq_group_diff > 10000
for: 5m
# 磁盘使用率告警
- alert: RocketMQDiskUsageHigh
expr: rocketmq_brokeruntime_commitlog_disk_ratio > 0.8
for: 5m
# Broker 宕机告警
- alert: RocketMQBrokerDown
expr: up{job="rocketmq"} == 0
for: 1m
Docker 部署
version: '3'
services:
namesrv:
image: apache/rocketmq:5.3.2
container_name: rmqnamesrv
ports:
- 9876:9876
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.3.2
container_name: rmqbroker
ports:
- 10911:10911
- 8081:8081
environment:
- NAMESRV_ADDR=namesrv:9876
command: sh mqbroker -n namesrv:9876 --enable-proxy
depends_on:
- namesrv
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rmqdashboard
ports:
- 8180:8080
environment:
- rocketmq.config.namesrvAddr=namesrv:9876
depends_on:
- namesrv
exporter:
image: apache/rocketmq-exporter:latest
container_name: rmqexporter
ports:
- 5557:5557
environment:
- ROCKETMQ_CONFIG_NAMESRVADDR=namesrv:9876
depends_on:
- namesrv
小结
本速查表涵盖了 RocketMQ 开发和运维中最常用的命令和配置,建议收藏以备查阅。