跳到主要内容

生产者

生产者(Producer)是 RocketMQ 系统中负责构建和发送消息的运行实体。本章将详细介绍生产者的工作原理和使用方法。

生产者概述

生产者职责

生产者的核心职责是将业务数据封装为消息,并发送到 RocketMQ 服务端:

  1. 消息构建:将业务数据封装为消息对象
  2. 路由获取:从 NameServer 获取 Topic 的路由信息
  3. 消息发送:将消息发送到对应的 Broker
  4. 结果处理:处理发送成功或失败的响应

生产者组

生产者组(Producer Group)是一类生产者的集合,具有以下特点:

  • 发送同一类消息,发送逻辑一致
  • 用于事务消息的事务回查
  • 同一组内的生产者可以相互替代

生产者类型

RocketMQ 5.0 提供了新的 gRPC 协议 SDK,同时兼容 4.x 版本的 Remoting 协议 SDK。

5.0 gRPC 协议 SDK(推荐)

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>

4.x Remoting 协议 SDK

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>

消息发送方式

RocketMQ 支持三种消息发送方式:同步发送、异步发送和单向发送。

同步发送

同步发送是最可靠的方式,发送方发出消息后等待服务端响应,收到响应后才继续发送下一条消息。

适用场景

  • 重要通知消息
  • 需要确保消息发送成功的场景
  • 对实时性要求较高的业务

代码示例(5.0 SDK)

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1. 创建客户端配置
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081") // Proxy 地址
.setRequestTimeout(Duration.ofSeconds(30))
.build();

// 2. 创建生产者
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("TestTopic") // 预绑定 Topic
.build();

// 3. 构建消息
Message message = provider.newMessageBuilder()
.setTopic("TestTopic") // 主题
.setTag("TagA") // 标签
.setKeys("order_001") // 消息索引键
.setBody("Hello RocketMQ".getBytes()) // 消息体
.build();

// 4. 同步发送消息
SendReceipt sendReceipt = producer.send(message);
System.out.println("消息发送成功,MessageId: " + sendReceipt.getMessageId());

// 5. 关闭生产者
producer.close();
}
}

代码示例(4.x SDK)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1. 创建生产者,指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");

// 2. 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");

// 3. 启动生产者
producer.start();

// 4. 构建消息
Message msg = new Message(
"TestTopic", // 主题
"TagA", // 标签
"order_001", // 消息索引键
"Hello RocketMQ".getBytes() // 消息体
);

// 5. 同步发送消息
SendResult sendResult = producer.send(msg);
System.out.println("发送结果: " + sendResult);

// 6. 关闭生产者
producer.shutdown();
}
}

SendResult 详解

public class SendResult {
private SendStatus sendStatus; // 发送状态
private String msgId; // 消息ID
private MessageQueue messageQueue; // 消息队列
private long queueOffset; // 队列偏移量
}

// 发送状态说明
public enum SendStatus {
SEND_OK, // 发送成功
FLUSH_DISK_TIMEOUT, // 刷盘超时
FLUSH_SLAVE_TIMEOUT, // 同步到 Slave 超时
SLAVE_NOT_AVAILABLE // Slave 不可用
}

异步发送

异步发送不等待服务端响应,通过回调函数处理发送结果。适用于对响应时间敏感的场景。

适用场景

  • 链路耗时较长的业务
  • 高吞吐量场景
  • 对实时性要求不高但需要可靠传输的场景

代码示例(4.x SDK)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 设置异步发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);

int messageCount = 10;
CountDownLatch latch = new CountDownLatch(messageCount);

for (int i = 0; i < messageCount; i++) {
final int index = i;
Message msg = new Message(
"TestTopic",
"TagA",
("Hello RocketMQ " + i).getBytes()
);

// 异步发送,通过回调处理结果
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("消息 %d 发送成功: %s%n",
index, sendResult.getMsgId());
latch.countDown();
}

@Override
public void onException(Throwable e) {
System.out.printf("消息 %d 发送失败: %s%n",
index, e.getMessage());
e.printStackTrace();
latch.countDown();
}
});
}

// 等待所有异步发送完成
latch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}

注意事项

  • 异步发送必须等待回调返回后再关闭 Producer
  • 需要合理处理回调中的异常
  • 建议使用 CountDownLatch 等机制确保消息发送完成

单向发送

单向发送只发送消息,不等待响应,也没有回调。适用于对可靠性要求不高、追求极致性能的场景。

适用场景

  • 日志收集
  • 监控数据上报
  • 非关键业务消息

代码示例(4.x SDK)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OnewayProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

for (int i = 0; i < 100; i++) {
Message msg = new Message(
"LogTopic",
"TagA",
("Log message " + i).getBytes()
);

// 单向发送,不等待响应
producer.sendOneway(msg);
}

producer.shutdown();
}
}

注意事项

  • 单向发送无返回结果,无法确认消息是否发送成功
  • 如果数据不可丢失,请使用同步或异步发送

三种发送方式对比

特性同步发送异步发送单向发送
可靠性最高最低
吞吐量最高
延迟最低
结果确认同步返回回调返回
适用场景重要消息高吞吐日志收集

消息发送流程

内部处理流程

队列选择策略

生产者发送消息时,需要选择一个 MessageQueue:

// 默认策略:轮询选择队列
public MessageQueue selectOneMessageQueue(TopicPublishInfo tpInfo, String lastBrokerName) {
// 轮询选择队列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
return tpInfo.getMessageQueueList().get(index % tpInfo.getMessageQueueList().size());
}

自定义队列选择

producer.setSelectMessageQueueStrategy(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据业务参数选择队列
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
});

// 发送时传入选择参数
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);

生产者配置

重要配置参数

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

// 基础配置
producer.setNamesrvAddr("localhost:9876"); // NameServer 地址
producer.setInstanceName("ProducerInstance"); // 实例名称
producer.setProducerGroup("ProducerGroupName"); // 生产者组名

// 发送超时配置
producer.setSendMsgTimeout(3000); // 发送超时时间(毫秒)
producer.setRetryTimesWhenSendFailed(2); // 同步发送重试次数
producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送重试次数

// 消息大小配置
producer.setMaxMessageSize(4 * 1024 * 1024); // 最大消息大小(默认 4MB)

// 压缩配置
producer.setCompressMsgBodyOverHowmuch(4096); // 超过 4KB 压缩
producer.setCompressLevel(5); // 压缩级别 (1-9)

// 重试配置
producer.setRetryAnotherBrokerWhenNotStoreOK(true); // 存储失败时重试其他 Broker

配置说明

参数默认值说明
sendMsgTimeout3000ms发送消息超时时间
retryTimesWhenSendFailed2同步发送失败重试次数
retryTimesWhenSendAsyncFailed2异步发送失败重试次数
maxMessageSize4MB单条消息最大大小
compressMsgBodyOverHowmuch4096超过此大小压缩消息体
defaultTopicQueueNums4自动创建 Topic 的队列数

最佳实践

1. 生产者组命名规范

// 推荐命名格式:业务名_功能名_Producer
DefaultMQProducer producer = new DefaultMQProducer("Order_Create_Producer");
DefaultMQProducer producer = new DefaultMQProducer("Payment_Notify_Producer");

2. 消息 Keys 设置

设置有意义的消息 Keys,便于问题排查和消息查询:

Message msg = new Message("OrderTopic", "Create", 
String.format("order_%d_%d", userId, orderId), // Keys
orderData.getBytes()
);

3. 异常处理

try {
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 发送失败,进行补偿处理
log.warn("消息发送异常: {}", result.getSendStatus());
// 重试或记录日志
}
} catch (Exception e) {
log.error("消息发送失败", e);
// 业务补偿逻辑
}

4. 优雅关闭

// 注册 JVM 钩子,确保生产者正确关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
producer.shutdown();
log.info("Producer shutdown successfully");
}));

5. 消息体序列化

推荐使用 JSON 或 Protobuf 序列化消息体:

// 使用 JSON 序列化
Order order = new Order(1L, "iPhone 15", 999.0);
String json = JSON.toJSONString(order);
Message msg = new Message("OrderTopic", json.getBytes(StandardCharsets.UTF_8));

// 使用 Protobuf 序列化
OrderProto.Order order = OrderProto.Order.newBuilder()
.setOrderId(1L)
.setProductName("iPhone 15")
.setPrice(999.0)
.build();
Message msg = new Message("OrderTopic", order.toByteArray());

小结

本章介绍了 RocketMQ 生产者的核心概念和使用方法:

  1. 生产者职责:消息构建、路由获取、消息发送、结果处理
  2. 生产者组:同类生产者的集合,用于事务回查
  3. 发送方式:同步发送、异步发送、单向发送
  4. 配置参数:超时、重试、消息大小等配置
  5. 最佳实践:命名规范、异常处理、优雅关闭

练习

  1. 实现一个订单消息发送器,包含同步和异步两种发送方式
  2. 自定义队列选择策略,根据订单 ID 选择队列
  3. 实现发送失败的重试和补偿机制
  4. 使用 CountDownLatch 实现异步发送的批量等待

延伸阅读