生产者
生产者(Producer)是 RocketMQ 系统中负责构建和发送消息的运行实体。本章将详细介绍生产者的工作原理和使用方法。
生产者概述
生产者职责
生产者的核心职责是将业务数据封装为消息,并发送到 RocketMQ 服务端:
- 消息构建:将业务数据封装为消息对象
- 路由获取:从 NameServer 获取 Topic 的路由信息
- 消息发送:将消息发送到对应的 Broker
- 结果处理:处理发送成功或失败的响应
生产者组
生产者组(Producer Group)是一类生产者的集合,具有以下特点:
- 发送同一类消息,发送逻辑一致
- 用于事务消息的事务回查
- 同一组内的生产者可以相互替代
生产者类型
RocketMQ 5.0 提供了新的 gRPC 协议 SDK,同时兼容 4.x 版本的 Remoting 协议 SDK。
5.0 gRPC 协议 SDK(推荐)
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
4.x Remoting 协议 SDK
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
消息发送方式
RocketMQ 支持三种消息发送方式:同步发送、异步发送和单向发送。
同步发送
同步发送是最可靠的方式,发送方发出消息后等待服务端响应,收到响应后才继续发送下一条消息。
适用场景:
- 重要通知消息
- 需要确保消息发送成功的场景
- 对实时性要求较高的业务
代码示例(5.0 SDK):
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1. 创建客户端配置
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081") // Proxy 地址
.setRequestTimeout(Duration.ofSeconds(30))
.build();
// 2. 创建生产者
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("TestTopic") // 预绑定 Topic
.build();
// 3. 构建消息
Message message = provider.newMessageBuilder()
.setTopic("TestTopic") // 主题
.setTag("TagA") // 标签
.setKeys("order_001") // 消息索引键
.setBody("Hello RocketMQ".getBytes()) // 消息体
.build();
// 4. 同步发送消息
SendReceipt sendReceipt = producer.send(message);
System.out.println("消息发送成功,MessageId: " + sendReceipt.getMessageId());
// 5. 关闭生产者
producer.close();
}
}
代码示例(4.x SDK):
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1. 创建生产者,指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
// 2. 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 3. 启动生产者
producer.start();
// 4. 构建消息
Message msg = new Message(
"TestTopic", // 主题
"TagA", // 标签
"order_001", // 消息索引键
"Hello RocketMQ".getBytes() // 消息体
);
// 5. 同步发送消息
SendResult sendResult = producer.send(msg);
System.out.println("发送结果: " + sendResult);
// 6. 关闭生产者
producer.shutdown();
}
}
SendResult 详解:
public class SendResult {
private SendStatus sendStatus; // 发送状态
private String msgId; // 消息ID
private MessageQueue messageQueue; // 消息队列
private long queueOffset; // 队列偏移量
}
// 发送状态说明
public enum SendStatus {
SEND_OK, // 发送成功
FLUSH_DISK_TIMEOUT, // 刷盘超时
FLUSH_SLAVE_TIMEOUT, // 同步到 Slave 超时
SLAVE_NOT_AVAILABLE // Slave 不可用
}
异步发送
异步发送不等待服务端响应,通过回调函数处理发送结果。适用于对响应时间敏感的场景。
适用场景:
- 链路耗时较长的业务
- 高吞吐量场景
- 对实时性要求不高但需要可靠传输的场景
代码示例(4.x SDK):
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 设置异步发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 10;
CountDownLatch latch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
Message msg = new Message(
"TestTopic",
"TagA",
("Hello RocketMQ " + i).getBytes()
);
// 异步发送,通过回调处理结果
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("消息 %d 发送成功: %s%n",
index, sendResult.getMsgId());
latch.countDown();
}
@Override
public void onException(Throwable e) {
System.out.printf("消息 %d 发送失败: %s%n",
index, e.getMessage());
e.printStackTrace();
latch.countDown();
}
});
}
// 等待所有异步发送完成
latch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
注意事项:
- 异步发送必须等待回调返回后再关闭 Producer
- 需要合理处理回调中的异常
- 建议使用 CountDownLatch 等机制确保消息发送完成
单向发送
单向发送只发送消息,不等待响应,也没有回调。适用于对可靠性要求不高、追求极致性能的场景。
适用场景:
- 日志收集
- 监控数据上报
- 非关键业务消息
代码示例(4.x SDK):
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OnewayProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message(
"LogTopic",
"TagA",
("Log message " + i).getBytes()
);
// 单向发送,不等待响应
producer.sendOneway(msg);
}
producer.shutdown();
}
}
注意事项:
- 单向发送无返回结果,无法确认消息是否发送成功
- 如果数据不可丢失,请使用同步或异步发送
三种发送方式对比
| 特性 | 同步发送 | 异步发送 | 单向发送 |
|---|---|---|---|
| 可靠性 | 最高 | 高 | 最低 |
| 吞吐量 | 低 | 中 | 最高 |
| 延迟 | 高 | 低 | 最低 |
| 结果确认 | 同步返回 | 回调返回 | 无 |
| 适用场景 | 重要消息 | 高吞吐 | 日志收集 |
消息发送流程
内部处理流程
队列选择策略
生产者发送消息时,需要选择一个 MessageQueue:
// 默认策略:轮询选择队列
public MessageQueue selectOneMessageQueue(TopicPublishInfo tpInfo, String lastBrokerName) {
// 轮询选择队列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
return tpInfo.getMessageQueueList().get(index % tpInfo.getMessageQueueList().size());
}
自定义队列选择:
producer.setSelectMessageQueueStrategy(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据业务参数选择队列
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
});
// 发送时传入选择参数
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);
生产者配置
重要配置参数
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 基础配置
producer.setNamesrvAddr("localhost:9876"); // NameServer 地址
producer.setInstanceName("ProducerInstance"); // 实例名称
producer.setProducerGroup("ProducerGroupName"); // 生产者组名
// 发送超时配置
producer.setSendMsgTimeout(3000); // 发送超时时间(毫秒)
producer.setRetryTimesWhenSendFailed(2); // 同步发送重试次数
producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送重试次数
// 消息大小配置
producer.setMaxMessageSize(4 * 1024 * 1024); // 最大消息大小(默认 4MB)
// 压缩配置
producer.setCompressMsgBodyOverHowmuch(4096); // 超过 4KB 压缩
producer.setCompressLevel(5); // 压缩级别 (1-9)
// 重试配置
producer.setRetryAnotherBrokerWhenNotStoreOK(true); // 存储失败时重试其他 Broker
配置说明
| 参数 | 默认值 | 说明 |
|---|---|---|
sendMsgTimeout | 3000ms | 发送消息超时时间 |
retryTimesWhenSendFailed | 2 | 同步发送失败重试次数 |
retryTimesWhenSendAsyncFailed | 2 | 异步发送失败重试次数 |
maxMessageSize | 4MB | 单条消息最大大小 |
compressMsgBodyOverHowmuch | 4096 | 超过此大小压缩消息体 |
defaultTopicQueueNums | 4 | 自动创建 Topic 的队列数 |
最佳实践
1. 生产者组命名规范
// 推荐命名格式:业务名_功能名_Producer
DefaultMQProducer producer = new DefaultMQProducer("Order_Create_Producer");
DefaultMQProducer producer = new DefaultMQProducer("Payment_Notify_Producer");
2. 消息 Keys 设置
设置有意义的消息 Keys,便于问题排查和消息查询:
Message msg = new Message("OrderTopic", "Create",
String.format("order_%d_%d", userId, orderId), // Keys
orderData.getBytes()
);
3. 异常处理
try {
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 发送失败,进行补偿处理
log.warn("消息发送异常: {}", result.getSendStatus());
// 重试或记录日志
}
} catch (Exception e) {
log.error("消息发送失败", e);
// 业务补偿逻辑
}
4. 优雅关闭
// 注册 JVM 钩子,确保生产者正确关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
producer.shutdown();
log.info("Producer shutdown successfully");
}));
5. 消息体序列化
推荐使用 JSON 或 Protobuf 序列化消息体:
// 使用 JSON 序列化
Order order = new Order(1L, "iPhone 15", 999.0);
String json = JSON.toJSONString(order);
Message msg = new Message("OrderTopic", json.getBytes(StandardCharsets.UTF_8));
// 使用 Protobuf 序列化
OrderProto.Order order = OrderProto.Order.newBuilder()
.setOrderId(1L)
.setProductName("iPhone 15")
.setPrice(999.0)
.build();
Message msg = new Message("OrderTopic", order.toByteArray());
小结
本章介绍了 RocketMQ 生产者的核心概念和使用方法:
- 生产者职责:消息构建、路由获取、消息发送、结果处理
- 生产者组:同类生产者的集合,用于事务回查
- 发送方式:同步发送、异步发送、单向发送
- 配置参数:超时、重试、消息大小等配置
- 最佳实践:命名规范、异常处理、优雅关闭
练习
- 实现一个订单消息发送器,包含同步和异步两种发送方式
- 自定义队列选择策略,根据订单 ID 选择队列
- 实现发送失败的重试和补偿机制
- 使用 CountDownLatch 实现异步发送的批量等待