tencent cloud

Feedback

Message Filtering

Last updated: 2024-01-17 16:53:09
    This document describes the features, application scenarios, and usage instructions of message filtering in TDMQ for RocketMQ.

    Feature Description

    Message filtering indicates that messages are filtered by the message attribute configured by the message producer when the producer sends messages to the topic. The consumer that subscribes to the topic can filter messages based on their attributes so that only eligible messages are delivered to the consumer for consumption.
    If a consumer configures no filter conditions when subscribing to a topic, no matter whether filter attributes are configured during message sending, all messages in the topic will be delivered to the consumer for consumption.

    Use Cases

    Generally, messages with the same business attributes are stored in the same topic. For example, when an order transaction topic contains messages of order placements, payments, and deliveries, and if you want to consume only one type of transaction messages in your business, you can filter them on the client, but this will waste bandwidth resources.
    To solve this problem, TDMQ supports message filtering on the broker. Users can set one or more tags during message production and subscribe to specified tags during consumption.
    

    Instructions

    Filtering by Tag

    Sending Messages

    Note:
    During message sending, tags must be clearly specified for each message.
    String tag = "yourMessageTagA";
    final Message message = provider.newMessageBuilder()
    // Set topic for the current message.
    .setTopic(topic)
    // Message secondary classifier of message besides topic.
    .setTag(tag)
    // Key(s) of the message, another way to mark message besides message id.
    .setKeys("yourMessageKey-1c151062f96e")
    .setBody(body)
    .build();

    Subscribing to Messages

    Subscribing to all tags: If a consumer wants to subscribe to all types of messages under a topic, an asterisk (*) can be used to represent all tags.
    String consumerGroup = "yourConsumerGroup";
    String topic = "yourTopic";
    String tag = "*";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    // In most case, you don't need to create too many consumers, singleton pattern is recommended.
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    // Handle the received message and return consume result.
    log.info("Consume message={}", messageView);
    return ConsumeResult.SUCCESS;
    })
    .build();
    Subscribing to one tag: If a consumer wants to subscribe to a certain type of messages under a topic, the tag should be specified clearly.
    String consumerGroup = "yourConsumerGroup";
    String topic = "yourTopic";
    String tag = "TAGA";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    // In most case, you don't need to create too many consumers, singleton pattern is recommended.
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    // Handle the received message and return consume result.
    log.info("Consume message={}", messageView);
    return ConsumeResult.SUCCESS;
    })
    .build();
    Subscribing to multiple tags: If a consumer wants to subscribe to multiple types of messages under a topic, two vertical bars (||) should be added between the two tags for separation.
    String consumerGroup = "yourConsumerGroup";
    String topic = "yourTopic";
    String tag = "TAGA || TAGB";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    // In most case, you don't need to create too many consumers, singleton pattern is recommended.
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    // Handle the received message and return consume result.
    log.info("Consume message={}", messageView);
    return ConsumeResult.SUCCESS;
    })
    .build();

    Filtering by SQL

    Sending Messages

    The message sending code here is basically the same as the code for sending simple messages. A message is allowed to carry multiple user-defined attributes when you construct the message body.
    final Message message = provider.newMessageBuilder()
    // Set topic for the current message.
    .setTopic(topic)
    // Message secondary classifier of message besides topic.
    // Key(s) of the message, another way to mark message besides message id.
    .setKeys("yourMessageKey-1c151062f96e")
    .setBody(body)
    // Some information for SQL filtering
    .addProperty("key1", "value1")
    .build();

    Subscribing to Messages

    The message consumption code here is basically the same as the code for consuming simple messages. However, a message needs to be carried with the corresponding SQL expression when being subscribed to.
    String consumerGroup = "yourConsumerGroup";
    String topic = "yourTopic";
    String sql = "key1 IS NOT NULL AND key1='value1'";
    // SQL expression
    FilterExpression filterExpression = new FilterExpression(sql, FilterExpressionType.SQL92);
    // If all is subscribed to
    //FilterExpression filterExpression = FilterExpression.SUB_ALL;
    // In most case, you don't need to create too many consumers, singleton pattern is recommended.
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    // Handle the received message and return consume result.
    log.info("Consume message={}", messageView);
    return ConsumeResult.SUCCESS;
    })
    .build();
    Note:
    The preceding sections provide introduction to the use instructions of message publishing and subscription. For more operations, see GitHub Demo or RocketMQ Official Documentation.
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support