消费者
消费者(Consumer)是 RocketMQ 系统中负责接收和处理消息的运行实体。本章将详细介绍消费者的工作原理和使用方法。
消费者概述
消费者职责
消费者的核心职责是从 Broker 获取消息并执行业务处理:
- 消息订阅:订阅感兴趣的 Topic 和 Tag
- 消息拉取:从 Broker 获取消息
- 消息处理:执行业务逻辑处理消息
- 消费确认:向 Broker 提交消费结果
消费者组
消费者组(Consumer Group)是一类消费者的集合,具有以下特点:
- 消费同一类消息,消费逻辑一致
- 组内消费者共同分担消息消费(集群模式)
- 消费进度由组统一维护
消费者类型
RocketMQ 5.0 提供了两种消费者类型:PushConsumer 和 SimpleConsumer。
PushConsumer(推荐)
PushConsumer 是一种被动接收消息的消费模式,服务端主动将消息推送给消费者。
特点:
- 消息实时投递,延迟低
- 自动管理消费进度
- 内置重试机制
- 使用简单,适合大多数场景
代码示例(5.0 SDK):
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageView;
import java.util.Collections;
public class PushConsumerDemo {
public static void main(String[] args) throws Exception {
// 1. 创建客户端配置
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
// 2. 创建 PushConsumer
PushConsumer consumer = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("TestConsumerGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"TestTopic",
new FilterExpression("*", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
// 3. 处理消息
System.out.println("收到消息: " + messageView.getMessageId());
try {
// 业务处理
String body = new String(
messageView.getBody().array(),
"UTF-8"
);
System.out.println("消息内容: " + body);
return ConsumeResult.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
return ConsumeResult.FAILURE;
}
})
.build();
// 保持运行
Thread.sleep(Long.MAX_VALUE);
}
}
SimpleConsumer
SimpleConsumer 是一种主动拉取消息的消费模式,消费者主动从服务端拉取消息。
特点:
- 消费者主动控制拉取时机和数量
- 需要手动确认消费结果
- 更灵活的消费控制
- 适合流式处理和批量处理场景
代码示例(5.0 SDK):
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageView;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerDemo {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
// 1. 创建 SimpleConsumer
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("TestConsumerGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"TestTopic",
new FilterExpression("*", FilterExpressionType.TAG)
))
.setAwaitDuration(Duration.ofSeconds(30)) // 长轮询等待时间
.build();
// 2. 主动拉取消息
while (true) {
try {
// 拉取消息,最多 10 条,等待 30 秒
List<MessageView> messages = consumer.receive(10, Duration.ofSeconds(30));
for (MessageView message : messages) {
try {
// 处理消息
System.out.println("收到消息: " + message.getMessageId());
String body = new String(message.getBody().array(), "UTF-8");
System.out.println("消息内容: " + body);
// 3. 确认消费成功
consumer.ack(message);
} catch (Exception e) {
// 确认消费失败,消息会重新投递
consumer.negativeAck(message);
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
两种消费者对比
| 特性 | PushConsumer | SimpleConsumer |
|---|---|---|
| 消息获取 | 服务端推送 | 客户端拉取 |
| 消费确认 | 自动/返回值 | 手动调用 ack |
| 消费进度 | 自动管理 | 手动管理 |
| 实时性 | 高 | 取决于拉取频率 |
| 流量控制 | 较难控制 | 完全控制 |
| 适用场景 | 一般业务消息 | 流式处理、批量处理 |
消费模式
集群消费(Clustering)
集群消费是最常用的消费模式,同一消费者组内的消费者共同分担消息消费。
特点:
- 每条消息只被组内一个消费者消费
- 消费进度由 Broker 维护
- 支持水平扩展
代码示例(4.x SDK):
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ClusterConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者,指定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupA");
// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅 Topic,默认为集群消费
consumer.subscribe("TestTopic", "*");
// 设置消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到消息: %s%n",
new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者启动成功");
}
}
广播消费(Broadcasting)
广播消费模式下,每条消息会被消费者组内的所有消费者消费。
特点:
- 每个消费者都消费全部消息
- 消费进度由消费者本地维护
- 适合配置同步、缓存刷新等场景
代码示例(4.x SDK):
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupB");
consumer.setNamesrvAddr("localhost:9876");
// 设置为广播消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("ConfigTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到配置消息: %s%n",
new String(msg.getBody()));
// 更新本地配置
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("广播消费者启动成功");
}
}
两种模式对比
| 特性 | 集群消费 | 广播消费 |
|---|---|---|
| 消息分发 | 组内分担 | 组内广播 |
| 消费进度 | Broker 维护 | Consumer 维护 |
| 扩展性 | 水平扩展 | 无扩展优势 |
| 适用场景 | 业务处理 | 配置同步、缓存刷新 |
消费监听器
并发消费(MessageListenerConcurrently)
并发消费模式下,消息被多个线程并发处理,提高消费吞吐量。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 并发处理消息
System.out.printf("线程 %s 处理消息: %s%n",
Thread.currentThread().getName(),
new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 配置并发线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
顺序消费(MessageListenerOrderly)
顺序消费模式下,同一队列的消息按顺序依次处理。
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true); // 自动提交消费进度
for (MessageExt msg : msgs) {
// 顺序处理消息
System.out.printf("顺序处理消息: %s%n",
new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
两种监听器对比
| 特性 | 并发消费 | 顺序消费 |
|---|---|---|
| 处理方式 | 多线程并发 | 单线程串行 |
| 吞吐量 | 高 | 低 |
| 顺序性 | 无保证 | 队列内有序 |
| 适用场景 | 无顺序要求的业务 | 需要顺序处理的业务 |
消费重试
重试机制
当消息消费失败时,RocketMQ 会自动进行消息重试:
重试队列:
- 消费失败的消息会进入重试队列
- 重试队列命名规则:
%RETRY% + ConsumerGroup - 消息会延迟一段时间后重新投递
重试间隔:
| 重试次数 | 延迟时间 | 重试次数 | 延迟时间 |
|---|---|---|---|
| 1 | 10秒 | 9 | 7分钟 |
| 2 | 30秒 | 10 | 8分钟 |
| 3 | 1分钟 | 11 | 9分钟 |
| 4 | 2分钟 | 12 | 10分钟 |
| 5 | 3分钟 | 13 | 20分钟 |
| 6 | 4分钟 | 14 | 30分钟 |
| 7 | 5分钟 | 15 | 1小时 |
| 8 | 6分钟 | 16 | 2小时 |
重试次数配置
// 设置最大重试次数(默认 16 次)
consumer.setMaxReconsumeTimes(5);
// 在监听器中获取重试次数
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 获取重试次数
int reconsumeTimes = msg.getReconsumeTimes();
System.out.printf("重试次数: %d%n", reconsumeTimes);
// 超过最大重试次数,记录日志,不再重试
if (reconsumeTimes >= 5) {
// 记录到数据库或发送告警
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try {
// 业务处理
process(msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 返回失败,触发重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
死信队列
超过最大重试次数的消息会进入死信队列(Dead Letter Queue):
- 死信队列命名规则:
%DLQ% + ConsumerGroup - 死信队列中的消息需要人工处理
- 可以通过控制台或命令行查看和处理
# 查看死信队列消息
sh bin/mqadmin queryMsgByKey -t %DLQ%ConsumerGroupA -k messageKey
消费进度管理
本地模式
消费进度保存在消费者本地:
// 广播消费模式,消费进度保存在本地
consumer.setMessageModel(MessageModel.BROADCASTING);
// 设置本地消费进度存储路径
consumer.setOffsetStorePath("/path/to/offsets.json");
集群模式
消费进度保存在 Broker:
// 集群消费模式,消费进度保存在 Broker
consumer.setMessageModel(MessageModel.CLUSTERING);
重置消费进度
# 重置消费进度到最新位置
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g ConsumerGroupA -t TestTopic -s -1
# 重置消费进度到指定时间
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g ConsumerGroupA -t TestTopic -s "20240101000000"
消费者配置
重要配置参数
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// 基础配置
consumer.setNamesrvAddr("localhost:9876"); // NameServer 地址
consumer.setConsumerGroup("ConsumerGroupName"); // 消费者组名
consumer.setMessageModel(MessageModel.CLUSTERING); // 消费模式
// 消费线程配置
consumer.setConsumeThreadMin(20); // 最小消费线程数
consumer.setConsumeThreadMax(64); // 最大消费线程数
// 消息批量配置
consumer.setPullBatchSize(32); // 每次拉取消息数量
consumer.setConsumeMessageBatchMaxSize(16); // 每次消费消息数量
// 拉取配置
consumer.setPullInterval(0); // 拉取间隔(毫秒)
consumer.setConsumeTimeout(15); // 消费超时(分钟)
// 重试配置
consumer.setMaxReconsumeTimes(16); // 最大重试次数
consumer.setDelayLevelWhenNextConsume(0); // 重试延迟级别
配置说明
| 参数 | 默认值 | 说明 |
|---|---|---|
consumeThreadMin | 20 | 最小消费线程数 |
consumeThreadMax | 20 | 最大消费线程数 |
pullBatchSize | 32 | 每次拉取的消息数量 |
consumeMessageBatchMaxSize | 1 | 每次消费的消息数量 |
consumeTimeout | 15 | 消费超时时间(分钟) |
maxReconsumeTimes | 16 | 最大重试次数 |
最佳实践
1. 消费者组命名规范
// 推荐命名格式:业务名_功能名_Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Order_Process_Consumer");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Payment_Notify_Consumer");
2. 幂等性处理
消费端必须保证消息处理的幂等性:
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String messageId = msg.getKeys(); // 使用业务唯一标识
// 检查是否已处理
if (isProcessed(messageId)) {
System.out.println("消息已处理,跳过: " + messageId);
continue;
}
// 处理业务
process(msg);
// 标记已处理
markProcessed(messageId);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
3. 异常处理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 业务处理
process(msg);
} catch (BusinessException e) {
// 业务异常,记录日志,不再重试
log.error("业务处理失败", e);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 系统异常,触发重试
log.error("系统异常,等待重试", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
4. 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.shutdown();
log.info("Consumer shutdown successfully");
}));
5. 监控消费堆积
// 获取消费堆积
long lag = consumer.getOffsetStore().getOffset(
mq.getTopic(),
mq.getQueueId()
) - brokerOffset;
小结
本章介绍了 RocketMQ 消费者的核心概念和使用方法:
- 消费者类型:PushConsumer 和 SimpleConsumer
- 消费模式:集群消费和广播消费
- 消费监听器:并发消费和顺序消费
- 消费重试:重试机制和死信队列
- 消费进度管理:本地模式和集群模式
- 最佳实践:幂等性、异常处理、优雅关闭
练习
- 实现 PushConsumer 和 SimpleConsumer 两种消费者
- 实现消息消费的幂等性处理
- 处理消费失败的消息,超过重试次数后记录日志
- 实现广播消费模式,用于配置同步