消息过滤
消息过滤是 RocketMQ 的重要特性之一,允许消费者根据条件只消费感兴趣的消息,减少不必要的网络传输和处理开销。
过滤机制概述
RocketMQ 提供了两种消息过滤方式:
| 过滤方式 | 说明 | 性能 | 灵活性 |
|---|---|---|---|
| Tag 过滤 | 基于消息标签过滤 | 高 | 低 |
| SQL92 过滤 | 基于 SQL 表达式过滤 | 中 | 高 |
Tag 过滤
Tag 是消息的标签,用于对消息进行分类。消费者可以通过指定 Tag 来过滤消息。
设置消息 Tag
生产者设置 Tag:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
public class TagMessageProducer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("OrderTopic")
.build();
// 发送不同 Tag 的消息
String[] tags = {"Create", "Pay", "Ship", "Complete"};
for (String tag : tags) {
Message message = provider.newMessageBuilder()
.setTopic("OrderTopic")
.setTag(tag) // 设置 Tag
.setBody(("订单消息-" + tag).getBytes())
.build();
producer.send(message);
System.out.println("发送消息,Tag: " + tag);
}
producer.close();
}
}
订阅指定 Tag
5.0 SDK 订阅方式:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageView;
import java.util.Collections;
public class TagFilterConsumer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
// 方式1:订阅单个 Tag
PushConsumer consumer1 = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("OrderCreateGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"OrderTopic",
new FilterExpression("Create", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
System.out.println("收到创建订单消息: " + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
// 方式2:订阅多个 Tag(使用 || 分隔)
PushConsumer consumer2 = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("OrderProcessGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"OrderTopic",
new FilterExpression("Create||Pay||Ship", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
System.out.println("收到订单处理消息: " + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
// 方式3:订阅所有 Tag
PushConsumer consumer3 = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("OrderAllGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"OrderTopic",
new FilterExpression("*", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
System.out.println("收到所有订单消息: " + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
}
}
4.x SDK 订阅方式:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TagFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderProcessGroup");
consumer.setNamesrvAddr("localhost:9876");
// 方式1:订阅单个 Tag
consumer.subscribe("OrderTopic", "Create");
// 方式2:订阅多个 Tag(使用 || 分隔)
consumer.subscribe("OrderTopic", "Create||Pay||Ship");
// 方式3:订阅所有 Tag
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到消息,Tag: %s, 内容: %s%n",
msg.getTags(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
Tag 过滤规则
| 表达式 | 说明 |
|---|---|
TagA | 只订阅 Tag 为 TagA 的消息 |
TagA||TagB | 订阅 Tag 为 TagA 或 TagB 的消息 |
* | 订阅所有消息 |
TagA||TagB||TagC | 订阅多个 Tag 的消息 |
Tag 过滤原理
关键点:
- Tag 过滤在 Broker 端执行
- 不匹配的消息不会被推送给消费者
- 减少网络传输,提高消费效率
SQL92 过滤
SQL92 过滤提供了更灵活的过滤能力,支持基于消息属性的复杂条件过滤。
启用 SQL92 过滤
SQL92 过滤需要在 Broker 配置中启用:
# broker.conf
enablePropertyFilter=true
设置消息属性
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 发送带属性的消息
Message msg1 = new Message("SqlTopic", "TagA", "消息1".getBytes());
msg1.putUserProperty("region", "Beijing");
msg1.putUserProperty("price", "100");
msg1.putUserProperty("level", "VIP");
Message msg2 = new Message("SqlTopic", "TagA", "消息2".getBytes());
msg2.putUserProperty("region", "Shanghai");
msg2.putUserProperty("price", "200");
msg2.putUserProperty("level", "Normal");
producer.send(msg1);
producer.send(msg2);
producer.shutdown();
}
}
SQL92 语法
SQL92 过滤支持以下语法:
| 语法 | 说明 | 示例 |
|---|---|---|
| 比较 | =, >, <, >=, <=, <> | price > 100 |
| 逻辑 | AND, OR, NOT | region = 'Beijing' AND price > 100 |
| 包含 | IN | region IN ('Beijing', 'Shanghai') |
| 范围 | BETWEEN | price BETWEEN 100 AND 500 |
| 模糊 | LIKE | region LIKE 'Bei%' |
| 空值 | IS NULL, IS NOT NULL | level IS NOT NULL |
使用 SQL92 过滤
5.0 SDK 方式:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.*;
import java.util.Collections;
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
// 使用 SQL92 表达式过滤
PushConsumer consumer = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("SqlFilterGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"SqlTopic",
new FilterExpression(
"region = 'Beijing' AND price > 50",
FilterExpressionType.SQL92
)
))
.setMessageListener(messageView -> {
System.out.println("收到消息: " + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
}
}
4.x SDK 方式:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterGroup");
consumer.setNamesrvAddr("localhost:9876");
// 使用 SQL92 表达式过滤
consumer.subscribe("SqlTopic",
"region = 'Beijing' AND price > 50");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到消息: %s, region: %s, price: %s%n",
new String(msg.getBody()),
msg.getUserProperty("region"),
msg.getUserProperty("price"));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
SQL92 过滤示例
// 示例1:区域过滤
consumer.subscribe("Topic", "region IN ('Beijing', 'Shanghai', 'Guangzhou')");
// 示例2:价格范围过滤
consumer.subscribe("Topic", "price BETWEEN 100 AND 500");
// 示例3:组合条件
consumer.subscribe("Topic",
"(region = 'Beijing' OR region = 'Shanghai') AND price > 100");
// 示例4:模糊匹配
consumer.subscribe("Topic", "region LIKE 'Bei%'");
// 示例5:非空判断
consumer.subscribe("Topic", "level IS NOT NULL AND level = 'VIP'");
// 示例6:复杂条件
consumer.subscribe("Topic",
"NOT (region = 'Beijing') AND price >= 100 AND level IN ('VIP', 'SVIP')");
Tag 过滤 vs SQL92 过滤
| 特性 | Tag 过滤 | SQL92 过滤 |
|---|---|---|
| 性能 | 高(索引查找) | 中(遍历计算) |
| 灵活性 | 低 | 高 |
| 使用场景 | 简单分类 | 复杂条件过滤 |
| 配置要求 | 无需额外配置 | 需要启用 enablePropertyFilter |
| 服务端开销 | 低 | 较高 |
选择建议
使用 Tag 过滤的场景:
- 消息有明确的分类
- 过滤条件固定且简单
- 对性能要求较高
使用 SQL92 过滤的场景:
- 需要基于多个属性组合过滤
- 过滤条件动态变化
- 需要范围查询或模糊匹配
过滤最佳实践
1. Tag 命名规范
// 推荐:使用业务含义明确的 Tag
msg.setTags("OrderCreate"); // 订单创建
msg.setTags("OrderPay"); // 订单支付
msg.setTags("OrderCancel"); // 订单取消
// 不推荐:使用无意义的 Tag
msg.setTags("Tag1");
msg.setTags("Tag2");
2. 合理使用属性
// 设置有意义的业务属性
msg.putUserProperty("orderId", "123456");
msg.putUserProperty("userId", "user001");
msg.putUserProperty("amount", "99.99");
msg.putUserProperty("status", "PAID");
// 使用属性进行过滤
consumer.subscribe("Topic", "status = 'PAID' AND amount > 50");
3. 避免过滤条件过于复杂
// 不推荐:过于复杂的过滤条件
consumer.subscribe("Topic",
"(a > 1 AND b < 10) OR (c = 'x' AND d IN ('m', 'n')) AND NOT e IS NULL");
// 推荐:简化过滤条件,必要时拆分 Topic
consumer.subscribe("TopicA", "a > 1 AND b < 10");
consumer.subscribe("TopicB", "c = 'x' AND d IN ('m', 'n')");
4. 过滤条件缓存
RocketMQ 会缓存消费者的过滤表达式,避免重复解析。建议保持过滤条件的稳定性。
实战示例:电商订单消息过滤
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderMessageDemo {
// 生产者:发送订单消息
public static class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 发送不同类型的订单消息
for (int i = 0; i < 10; i++) {
String orderType = i % 3 == 0 ? "Normal" : (i % 3 == 1 ? "VIP" : "SVIP");
String region = i % 2 == 0 ? "Beijing" : "Shanghai";
Message msg = new Message("OrderTopic", orderType,
("订单消息-" + i).getBytes());
msg.putUserProperty("region", region);
msg.putUserProperty("amount", String.valueOf(100 + i * 50));
msg.putUserProperty("orderType", orderType);
producer.send(msg);
System.out.printf("发送消息: type=%s, region=%s%n", orderType, region);
}
producer.shutdown();
}
}
// 消费者1:只消费 VIP 订单
public static class VipOrderConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("VipOrderGroup");
consumer.setNamesrvAddr("localhost:9876");
// 使用 Tag 过滤 VIP 和 SVIP 订单
consumer.subscribe("OrderTopic", "VIP||SVIP");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("[VIP消费] 收到消息: %s, Tag: %s%n",
new String(msg.getBody()), msg.getTags());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("VIP 订单消费者启动");
}
}
// 消费者2:消费北京地区大额订单
public static class BeijingHighValueConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BeijingHighValueGroup");
consumer.setNamesrvAddr("localhost:9876");
// 使用 SQL92 过滤北京地区金额大于 200 的订单
consumer.subscribe("OrderTopic",
"region = 'Beijing' AND amount > 200");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("[北京大额消费] 收到消息: %s, region: %s, amount: %s%n",
new String(msg.getBody()),
msg.getUserProperty("region"),
msg.getUserProperty("amount"));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("北京大额订单消费者启动");
}
}
}
小结
本章介绍了 RocketMQ 的消息过滤机制:
- Tag 过滤:基于消息标签的简单过滤,性能高
- SQL92 过滤:基于 SQL 表达式的复杂过滤,灵活性高
- 过滤选择:根据业务场景选择合适的过滤方式
- 最佳实践:合理命名 Tag、设置属性、避免复杂条件
练习
- 使用 Tag 过滤实现不同类型消息的分类订阅
- 使用 SQL92 过滤实现基于属性的复杂条件过滤
- 对比 Tag 过滤和 SQL92 过滤的性能差异
- 设计一个电商订单的消息过滤方案