跳到主要内容

消费者

消费者(Consumer)是 RocketMQ 系统中负责接收和处理消息的运行实体。本章将详细介绍消费者的工作原理和使用方法。

消费者概述

消费者职责

消费者的核心职责是从 Broker 获取消息并执行业务处理:

  1. 消息订阅:订阅感兴趣的 Topic 和 Tag
  2. 消息拉取:从 Broker 获取消息
  3. 消息处理:执行业务逻辑处理消息
  4. 消费确认:向 Broker 提交消费结果

消费者组

消费者组(Consumer Group)是一类消费者的集合,具有以下特点:

  • 消费同一类消息,消费逻辑一致
  • 组内消费者共同分担消息消费(集群模式)
  • 消费进度由组统一维护

消费者类型

RocketMQ 5.0 提供了两种消费者类型:PushConsumer 和 SimpleConsumer。

PushConsumer(推荐)

PushConsumer 是一种被动接收消息的消费模式,服务端主动将消息推送给消费者。

特点

  • 消息实时投递,延迟低
  • 自动管理消费进度
  • 内置重试机制
  • 使用简单,适合大多数场景

代码示例(5.0 SDK)

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageView;
import java.util.Collections;

public class PushConsumerDemo {
public static void main(String[] args) throws Exception {
// 1. 创建客户端配置
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

// 2. 创建 PushConsumer
PushConsumer consumer = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("TestConsumerGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"TestTopic",
new FilterExpression("*", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
// 3. 处理消息
System.out.println("收到消息: " + messageView.getMessageId());
try {
// 业务处理
String body = new String(
messageView.getBody().array(),
"UTF-8"
);
System.out.println("消息内容: " + body);

return ConsumeResult.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
return ConsumeResult.FAILURE;
}
})
.build();

// 保持运行
Thread.sleep(Long.MAX_VALUE);
}
}

SimpleConsumer

SimpleConsumer 是一种主动拉取消息的消费模式,消费者主动从服务端拉取消息。

特点

  • 消费者主动控制拉取时机和数量
  • 需要手动确认消费结果
  • 更灵活的消费控制
  • 适合流式处理和批量处理场景

代码示例(5.0 SDK)

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageView;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerDemo {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

// 1. 创建 SimpleConsumer
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("TestConsumerGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"TestTopic",
new FilterExpression("*", FilterExpressionType.TAG)
))
.setAwaitDuration(Duration.ofSeconds(30)) // 长轮询等待时间
.build();

// 2. 主动拉取消息
while (true) {
try {
// 拉取消息,最多 10 条,等待 30 秒
List<MessageView> messages = consumer.receive(10, Duration.ofSeconds(30));

for (MessageView message : messages) {
try {
// 处理消息
System.out.println("收到消息: " + message.getMessageId());
String body = new String(message.getBody().array(), "UTF-8");
System.out.println("消息内容: " + body);

// 3. 确认消费成功
consumer.ack(message);
} catch (Exception e) {
// 确认消费失败,消息会重新投递
consumer.negativeAck(message);
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

两种消费者对比

特性PushConsumerSimpleConsumer
消息获取服务端推送客户端拉取
消费确认自动/返回值手动调用 ack
消费进度自动管理手动管理
实时性取决于拉取频率
流量控制较难控制完全控制
适用场景一般业务消息流式处理、批量处理

消费模式

集群消费(Clustering)

集群消费是最常用的消费模式,同一消费者组内的消费者共同分担消息消费。

特点

  • 每条消息只被组内一个消费者消费
  • 消费进度由 Broker 维护
  • 支持水平扩展

代码示例(4.x SDK)

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class ClusterConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者,指定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupA");

// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");

// 订阅 Topic,默认为集群消费
consumer.subscribe("TestTopic", "*");

// 设置消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到消息: %s%n",
new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();
System.out.println("消费者启动成功");
}
}

广播消费(Broadcasting)

广播消费模式下,每条消息会被消费者组内的所有消费者消费。

特点

  • 每个消费者都消费全部消息
  • 消费进度由消费者本地维护
  • 适合配置同步、缓存刷新等场景

代码示例(4.x SDK)

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;

public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupB");
consumer.setNamesrvAddr("localhost:9876");

// 设置为广播消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.subscribe("ConfigTopic", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到配置消息: %s%n",
new String(msg.getBody()));
// 更新本地配置
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
System.out.println("广播消费者启动成功");
}
}

两种模式对比

特性集群消费广播消费
消息分发组内分担组内广播
消费进度Broker 维护Consumer 维护
扩展性水平扩展无扩展优势
适用场景业务处理配置同步、缓存刷新

消费监听器

并发消费(MessageListenerConcurrently)

并发消费模式下,消息被多个线程并发处理,提高消费吞吐量。

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 并发处理消息
System.out.printf("线程 %s 处理消息: %s%n",
Thread.currentThread().getName(),
new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 配置并发线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);

顺序消费(MessageListenerOrderly)

顺序消费模式下,同一队列的消息按顺序依次处理。

consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true); // 自动提交消费进度

for (MessageExt msg : msgs) {
// 顺序处理消息
System.out.printf("顺序处理消息: %s%n",
new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

两种监听器对比

特性并发消费顺序消费
处理方式多线程并发单线程串行
吞吐量
顺序性无保证队列内有序
适用场景无顺序要求的业务需要顺序处理的业务

消费重试

重试机制

当消息消费失败时,RocketMQ 会自动进行消息重试:

重试队列

  • 消费失败的消息会进入重试队列
  • 重试队列命名规则:%RETRY% + ConsumerGroup
  • 消息会延迟一段时间后重新投递

重试间隔

重试次数延迟时间重试次数延迟时间
110秒97分钟
230秒108分钟
31分钟119分钟
42分钟1210分钟
53分钟1320分钟
64分钟1430分钟
75分钟151小时
86分钟162小时

重试次数配置

// 设置最大重试次数(默认 16 次)
consumer.setMaxReconsumeTimes(5);

// 在监听器中获取重试次数
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 获取重试次数
int reconsumeTimes = msg.getReconsumeTimes();
System.out.printf("重试次数: %d%n", reconsumeTimes);

// 超过最大重试次数,记录日志,不再重试
if (reconsumeTimes >= 5) {
// 记录到数据库或发送告警
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

try {
// 业务处理
process(msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 返回失败,触发重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

死信队列

超过最大重试次数的消息会进入死信队列(Dead Letter Queue):

  • 死信队列命名规则:%DLQ% + ConsumerGroup
  • 死信队列中的消息需要人工处理
  • 可以通过控制台或命令行查看和处理
# 查看死信队列消息
sh bin/mqadmin queryMsgByKey -t %DLQ%ConsumerGroupA -k messageKey

消费进度管理

本地模式

消费进度保存在消费者本地:

// 广播消费模式,消费进度保存在本地
consumer.setMessageModel(MessageModel.BROADCASTING);

// 设置本地消费进度存储路径
consumer.setOffsetStorePath("/path/to/offsets.json");

集群模式

消费进度保存在 Broker:

// 集群消费模式,消费进度保存在 Broker
consumer.setMessageModel(MessageModel.CLUSTERING);

重置消费进度

# 重置消费进度到最新位置
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g ConsumerGroupA -t TestTopic -s -1

# 重置消费进度到指定时间
sh bin/mqadmin resetOffsetByTime -n localhost:9876 -g ConsumerGroupA -t TestTopic -s "20240101000000"

消费者配置

重要配置参数

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");

// 基础配置
consumer.setNamesrvAddr("localhost:9876"); // NameServer 地址
consumer.setConsumerGroup("ConsumerGroupName"); // 消费者组名
consumer.setMessageModel(MessageModel.CLUSTERING); // 消费模式

// 消费线程配置
consumer.setConsumeThreadMin(20); // 最小消费线程数
consumer.setConsumeThreadMax(64); // 最大消费线程数

// 消息批量配置
consumer.setPullBatchSize(32); // 每次拉取消息数量
consumer.setConsumeMessageBatchMaxSize(16); // 每次消费消息数量

// 拉取配置
consumer.setPullInterval(0); // 拉取间隔(毫秒)
consumer.setConsumeTimeout(15); // 消费超时(分钟)

// 重试配置
consumer.setMaxReconsumeTimes(16); // 最大重试次数
consumer.setDelayLevelWhenNextConsume(0); // 重试延迟级别

配置说明

参数默认值说明
consumeThreadMin20最小消费线程数
consumeThreadMax20最大消费线程数
pullBatchSize32每次拉取的消息数量
consumeMessageBatchMaxSize1每次消费的消息数量
consumeTimeout15消费超时时间(分钟)
maxReconsumeTimes16最大重试次数

最佳实践

1. 消费者组命名规范

// 推荐命名格式:业务名_功能名_Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Order_Process_Consumer");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Payment_Notify_Consumer");

2. 幂等性处理

消费端必须保证消息处理的幂等性:

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String messageId = msg.getKeys(); // 使用业务唯一标识

// 检查是否已处理
if (isProcessed(messageId)) {
System.out.println("消息已处理,跳过: " + messageId);
continue;
}

// 处理业务
process(msg);

// 标记已处理
markProcessed(messageId);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

3. 异常处理

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 业务处理
process(msg);
} catch (BusinessException e) {
// 业务异常,记录日志,不再重试
log.error("业务处理失败", e);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 系统异常,触发重试
log.error("系统异常,等待重试", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

4. 优雅关闭

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.shutdown();
log.info("Consumer shutdown successfully");
}));

5. 监控消费堆积

// 获取消费堆积
long lag = consumer.getOffsetStore().getOffset(
mq.getTopic(),
mq.getQueueId()
) - brokerOffset;

小结

本章介绍了 RocketMQ 消费者的核心概念和使用方法:

  1. 消费者类型:PushConsumer 和 SimpleConsumer
  2. 消费模式:集群消费和广播消费
  3. 消费监听器:并发消费和顺序消费
  4. 消费重试:重试机制和死信队列
  5. 消费进度管理:本地模式和集群模式
  6. 最佳实践:幂等性、异常处理、优雅关闭

练习

  1. 实现 PushConsumer 和 SimpleConsumer 两种消费者
  2. 实现消息消费的幂等性处理
  3. 处理消费失败的消息,超过重试次数后记录日志
  4. 实现广播消费模式,用于配置同步

延伸阅读