tencent cloud

消息队列 RocketMQ 版

动态与公告
新功能发布记录
公告
产品简介
产品概述
什么是消息队列 RocketMQ 版
产品优势
应用场景
产品系列
开源对比
高可用
使用限制
开服地域
基本概念
产品计费
计费概述
价格说明
计费示例
切换集群计费模式(5.x)
续费说明
查看消费明细
退费说明
欠费说明
快速入门
快速入门概述
准备工作
步骤1:创建 RocketMQ 资源
步骤2:使用 SDK 收发消息(推荐)
步骤2:运行 RocketMQ 客户端(可选)
步骤3:查询消息
步骤4:销毁资源
用户指南
使用流程指引
配置账号权限
新建集群
命名空间管理
配置 Topic
配置 Group
连接集群
管理消息
管理集群
查看监控和配置告警
跨集群复制消息
实践教程
RocketMQ 常见概念命名规范
RocketMQ 客户端实践
RocketMQ 性能压测和容量评估
使用社区版 HTTP SDK 接入
客户端风险说明和更新指南
关于 RocketMQ 4.x 集群角色(Role)相关云 API 迁移指引
迁移指南
有感迁移
无感迁移
开发指南
消息类型
消息过滤
消息重试
POP 消费模式(5.x)
集群消费与广播消费
订阅关系一致性
限流
API 参考(5.x)
History
API Category
Making API Requests
Topic APIs
Consumer Group APIs
Message APIs
Role Authentication APIs
Hitless Migration APIs
Cloud Migration APIs
Cluster APIs
Data Types
Error Codes
API 参考(4.x)
SDK 参考
SDK 概述
5.x SDK
4.x SDK
安全与合规
权限管理
云 API 审计
删除保护
常见问题
4.x 实例常见问题
服务协议
服务等级协议
联系我们

消息过滤

PDF
聚焦模式
字号
最后更新时间: 2025-07-23 14:19:00
本文主要介绍 TDMQ RocketMQ 版中消息过滤的功能、应用场景和使用方式。

功能介绍

消息过滤功能指消息生产者向 Topic 中发送消息时,设置消息属性对消息进行分类,消费者订阅 Topic 时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。
消费者订阅 Topic 时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic 中的所有消息都将被投递到消费端进行消费。

应用场景

通常,一个 Topic 中存放的是相同业务属性的消息,例如交易流水 Topic 包含了下单流水、支付流水、发货流水等,这些消息会发送到同一个交易流水 Topic 中,业务若只想消费者其中一种类别的消息,可在客户端进行过滤。
账单系统:只需订阅支付消息。
物流系统:只需订阅物流消息。
库存系统:只需订阅下单消息。


使用方式

目前消息过滤主要支持两种过滤方式,分别是 SQL 过滤和 TAG 过滤。其核心逻辑都是在发送消息的时候,设置一些自定义字段,然后通过消费组订阅的时候指定对应的过滤表达式,消息在服务端进行过滤后,才被消费组消费。
TAG 过滤
SQL 过滤

发送消息

说明:
发送消息时,每条消息必须指明 Tag。
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
// Set topic for the current message.
.setTopic(topic)
// Message secondary classifier of message besides topic.
.setTag(tag)
// Key(s) of the message, another way to mark message besides message id.
.setKeys("yourMessageKey-1c151062f96e")
.setBody(body)
.build();

订阅消息

订阅所有 Tag:消费者如需订阅某 Topic 下所有类型的消息,Tag 用星号(*)表示。
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// Handle the received message and return consume result.
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
订阅单个 Tag:消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag。
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
String tag = "TAGA";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// Handle the received message and return consume result.
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
订阅多个 Tag:消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用两个竖线||分隔。
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
String tag = "TAGA || TAGB";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// Handle the received message and return consume result.
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();

发送消息

发送代码和简单的消息没有区别。主要是在构造消息体的时候,带上自定义属性,允许多个。
final Message message = provider.newMessageBuilder()
// Set topic for the current message.
.setTopic(topic)
// Message secondary classifier of message besides topic.
// Key(s) of the message, another way to mark message besides message id.
.setKeys("yourMessageKey-1c151062f96e")
.setBody(body)
//一些用于sql过滤的信息
.addProperty("key1", "value1")
.build();

订阅消息

对于消费消息,订阅时需带上相应的 SQL 表达式,其余与普通的消费消息流程无区别。
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
String sql = "key1 IS NOT NULL AND key1='value1'";
//sql表达式
FilterExpression filterExpression = new FilterExpression(sql, FilterExpressionType.SQL92);
//如果是订阅所有
//FilterExpression filterExpression = FilterExpression.SUB_ALL;
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// Handle the received message and return consume result.
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
说明
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 GitHub DemoRocketMQ 官方文档

使用限制

由于 SQL 属性过滤是生产者定义消息属性,消费者设置 SQL 过滤条件,计算之后,可能有不同的结果,因此服务端的处理方式如下:
异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。
空值情况处理:如果过滤条件的表达式计算值为 null 或不是布尔类型(true 和 false),则消息默认被过滤,不会被投递给消费者。例如发送消息时不存在某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为 null。
类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。
虽然这种方式是灵活的,但是在消息头中还是不建议设置太多的值,因为总的消息头部属性有大小限制(32K),内置的已经占用了不少。超长之后,可能导致消息发送或者消费异常。

使用建议

合理划分主题和 Tag 标签。
从消息的过滤机制和主题的原理机制可以看出,业务消息的拆分可以基于主题进行筛选,也可以基于主题内消息的 Tag 标签及属性进行筛选。关于拆分方式的选择,应注意以下问题:
消息类型是否一致:不同类型的消息,如顺序消息和普通消息需要使用不同的主题进行拆分,无法通过 Tag 标签进行分类。
业务域是否相同:不同业务域和部门的消息应该拆分不同的主题。例如物流消息和支付消息应该使用两个不同的主题;同样是一个主题内的物流消息,普通物流消息和加急物流消息则可以通过不同的 Tag 进行区分。
消息量级和重要性是否一致:如果消息的量级规模存在巨大差异,或者说消息的链路重要程度存在差异,则应该使用不同的主题进行隔离拆分。









帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈