跳到主要内容

消息过滤

消息过滤是 RocketMQ 的重要特性之一,允许消费者根据条件只消费感兴趣的消息,减少不必要的网络传输和处理开销。

过滤机制概述

RocketMQ 提供了两种消息过滤方式:

过滤方式说明性能灵活性
Tag 过滤基于消息标签过滤
SQL92 过滤基于 SQL 表达式过滤

Tag 过滤

Tag 是消息的标签,用于对消息进行分类。消费者可以通过指定 Tag 来过滤消息。

设置消息 Tag

生产者设置 Tag

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;

public class TagMessageProducer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics("OrderTopic")
.build();

// 发送不同 Tag 的消息
String[] tags = {"Create", "Pay", "Ship", "Complete"};
for (String tag : tags) {
Message message = provider.newMessageBuilder()
.setTopic("OrderTopic")
.setTag(tag) // 设置 Tag
.setBody(("订单消息-" + tag).getBytes())
.build();

producer.send(message);
System.out.println("发送消息,Tag: " + tag);
}

producer.close();
}
}

订阅指定 Tag

5.0 SDK 订阅方式

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageView;
import java.util.Collections;

public class TagFilterConsumer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

// 方式1:订阅单个 Tag
PushConsumer consumer1 = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("OrderCreateGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"OrderTopic",
new FilterExpression("Create", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
System.out.println("收到创建订单消息: " + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();

// 方式2:订阅多个 Tag(使用 || 分隔)
PushConsumer consumer2 = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("OrderProcessGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"OrderTopic",
new FilterExpression("Create||Pay||Ship", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
System.out.println("收到订单处理消息: " + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();

// 方式3:订阅所有 Tag
PushConsumer consumer3 = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("OrderAllGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"OrderTopic",
new FilterExpression("*", FilterExpressionType.TAG)
))
.setMessageListener(messageView -> {
System.out.println("收到所有订单消息: " + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
}
}

4.x SDK 订阅方式

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TagFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderProcessGroup");
consumer.setNamesrvAddr("localhost:9876");

// 方式1:订阅单个 Tag
consumer.subscribe("OrderTopic", "Create");

// 方式2:订阅多个 Tag(使用 || 分隔)
consumer.subscribe("OrderTopic", "Create||Pay||Ship");

// 方式3:订阅所有 Tag
consumer.subscribe("OrderTopic", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到消息,Tag: %s, 内容: %s%n",
msg.getTags(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
}
}

Tag 过滤规则

表达式说明
TagA只订阅 Tag 为 TagA 的消息
TagA||TagB订阅 Tag 为 TagA 或 TagB 的消息
*订阅所有消息
TagA||TagB||TagC订阅多个 Tag 的消息

Tag 过滤原理

关键点

  • Tag 过滤在 Broker 端执行
  • 不匹配的消息不会被推送给消费者
  • 减少网络传输,提高消费效率

SQL92 过滤

SQL92 过滤提供了更灵活的过滤能力,支持基于消息属性的复杂条件过滤。

启用 SQL92 过滤

SQL92 过滤需要在 Broker 配置中启用:

# broker.conf
enablePropertyFilter=true

设置消息属性

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 发送带属性的消息
Message msg1 = new Message("SqlTopic", "TagA", "消息1".getBytes());
msg1.putUserProperty("region", "Beijing");
msg1.putUserProperty("price", "100");
msg1.putUserProperty("level", "VIP");

Message msg2 = new Message("SqlTopic", "TagA", "消息2".getBytes());
msg2.putUserProperty("region", "Shanghai");
msg2.putUserProperty("price", "200");
msg2.putUserProperty("level", "Normal");

producer.send(msg1);
producer.send(msg2);

producer.shutdown();
}
}

SQL92 语法

SQL92 过滤支持以下语法:

语法说明示例
比较=, >, <, >=, <=, <>price > 100
逻辑AND, OR, NOTregion = 'Beijing' AND price > 100
包含INregion IN ('Beijing', 'Shanghai')
范围BETWEENprice BETWEEN 100 AND 500
模糊LIKEregion LIKE 'Bei%'
空值IS NULL, IS NOT NULLlevel IS NOT NULL

使用 SQL92 过滤

5.0 SDK 方式

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.*;
import java.util.Collections;

public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();

// 使用 SQL92 表达式过滤
PushConsumer consumer = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("SqlFilterGroup")
.setSubscriptionExpressions(Collections.singletonMap(
"SqlTopic",
new FilterExpression(
"region = 'Beijing' AND price > 50",
FilterExpressionType.SQL92
)
))
.setMessageListener(messageView -> {
System.out.println("收到消息: " + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
}
}

4.x SDK 方式

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterGroup");
consumer.setNamesrvAddr("localhost:9876");

// 使用 SQL92 表达式过滤
consumer.subscribe("SqlTopic",
"region = 'Beijing' AND price > 50");

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到消息: %s, region: %s, price: %s%n",
new String(msg.getBody()),
msg.getUserProperty("region"),
msg.getUserProperty("price"));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
}
}

SQL92 过滤示例

// 示例1:区域过滤
consumer.subscribe("Topic", "region IN ('Beijing', 'Shanghai', 'Guangzhou')");

// 示例2:价格范围过滤
consumer.subscribe("Topic", "price BETWEEN 100 AND 500");

// 示例3:组合条件
consumer.subscribe("Topic",
"(region = 'Beijing' OR region = 'Shanghai') AND price > 100");

// 示例4:模糊匹配
consumer.subscribe("Topic", "region LIKE 'Bei%'");

// 示例5:非空判断
consumer.subscribe("Topic", "level IS NOT NULL AND level = 'VIP'");

// 示例6:复杂条件
consumer.subscribe("Topic",
"NOT (region = 'Beijing') AND price >= 100 AND level IN ('VIP', 'SVIP')");

Tag 过滤 vs SQL92 过滤

特性Tag 过滤SQL92 过滤
性能高(索引查找)中(遍历计算)
灵活性
使用场景简单分类复杂条件过滤
配置要求无需额外配置需要启用 enablePropertyFilter
服务端开销较高

选择建议

使用 Tag 过滤的场景

  • 消息有明确的分类
  • 过滤条件固定且简单
  • 对性能要求较高

使用 SQL92 过滤的场景

  • 需要基于多个属性组合过滤
  • 过滤条件动态变化
  • 需要范围查询或模糊匹配

过滤最佳实践

1. Tag 命名规范

// 推荐:使用业务含义明确的 Tag
msg.setTags("OrderCreate"); // 订单创建
msg.setTags("OrderPay"); // 订单支付
msg.setTags("OrderCancel"); // 订单取消

// 不推荐:使用无意义的 Tag
msg.setTags("Tag1");
msg.setTags("Tag2");

2. 合理使用属性

// 设置有意义的业务属性
msg.putUserProperty("orderId", "123456");
msg.putUserProperty("userId", "user001");
msg.putUserProperty("amount", "99.99");
msg.putUserProperty("status", "PAID");

// 使用属性进行过滤
consumer.subscribe("Topic", "status = 'PAID' AND amount > 50");

3. 避免过滤条件过于复杂

// 不推荐:过于复杂的过滤条件
consumer.subscribe("Topic",
"(a > 1 AND b < 10) OR (c = 'x' AND d IN ('m', 'n')) AND NOT e IS NULL");

// 推荐:简化过滤条件,必要时拆分 Topic
consumer.subscribe("TopicA", "a > 1 AND b < 10");
consumer.subscribe("TopicB", "c = 'x' AND d IN ('m', 'n')");

4. 过滤条件缓存

RocketMQ 会缓存消费者的过滤表达式,避免重复解析。建议保持过滤条件的稳定性。

实战示例:电商订单消息过滤

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class OrderMessageDemo {

// 生产者:发送订单消息
public static class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 发送不同类型的订单消息
for (int i = 0; i < 10; i++) {
String orderType = i % 3 == 0 ? "Normal" : (i % 3 == 1 ? "VIP" : "SVIP");
String region = i % 2 == 0 ? "Beijing" : "Shanghai";

Message msg = new Message("OrderTopic", orderType,
("订单消息-" + i).getBytes());
msg.putUserProperty("region", region);
msg.putUserProperty("amount", String.valueOf(100 + i * 50));
msg.putUserProperty("orderType", orderType);

producer.send(msg);
System.out.printf("发送消息: type=%s, region=%s%n", orderType, region);
}

producer.shutdown();
}
}

// 消费者1:只消费 VIP 订单
public static class VipOrderConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("VipOrderGroup");
consumer.setNamesrvAddr("localhost:9876");

// 使用 Tag 过滤 VIP 和 SVIP 订单
consumer.subscribe("OrderTopic", "VIP||SVIP");

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("[VIP消费] 收到消息: %s, Tag: %s%n",
new String(msg.getBody()), msg.getTags());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
System.out.println("VIP 订单消费者启动");
}
}

// 消费者2:消费北京地区大额订单
public static class BeijingHighValueConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BeijingHighValueGroup");
consumer.setNamesrvAddr("localhost:9876");

// 使用 SQL92 过滤北京地区金额大于 200 的订单
consumer.subscribe("OrderTopic",
"region = 'Beijing' AND amount > 200");

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("[北京大额消费] 收到消息: %s, region: %s, amount: %s%n",
new String(msg.getBody()),
msg.getUserProperty("region"),
msg.getUserProperty("amount"));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
System.out.println("北京大额订单消费者启动");
}
}
}

小结

本章介绍了 RocketMQ 的消息过滤机制:

  1. Tag 过滤:基于消息标签的简单过滤,性能高
  2. SQL92 过滤:基于 SQL 表达式的复杂过滤,灵活性高
  3. 过滤选择:根据业务场景选择合适的过滤方式
  4. 最佳实践:合理命名 Tag、设置属性、避免复杂条件

练习

  1. 使用 Tag 过滤实现不同类型消息的分类订阅
  2. 使用 SQL92 过滤实现基于属性的复杂条件过滤
  3. 对比 Tag 过滤和 SQL92 过滤的性能差异
  4. 设计一个电商订单的消息过滤方案

延伸阅读