tencent cloud

Configuring SQL Filtering
Last updated:2026-01-30 14:59:41
Configuring SQL Filtering
Last updated: 2026-01-30 14:59:41
Note:
The current feature is in grayscale. If the console prompt shows the current cluster has not enabled this feature, please submit a ticket to contact us for a fix.

Background

The MQTT standard specification defines the concept of Topic Filter, allowing subscribers to select messages to receive based on the hierarchical structure and wildcards of MQTT Topic Names. While topics and wildcards provide robust filtering capabilities, in scenarios such as grayscale releases, A/B testing, and system upgrades, relying solely on topic-based filtering still cannot meet more flexible business requirements.

Implementation Principles

The MQTT 5.0 protocol introduces the Subscribe User Property mechanism. Based on this mechanism, this product extends support for the filtering semantics of Subscribe User Properties, enabling finer-grained message filtering capabilities. When subscribing to messages, if the Subscribe User Properties contain a Key of $where and a Value that is a valid WHERE clause, the MQTT Server will filter messages according to this WHERE clause during message delivery, only pushing messages that meet the conditions to subscribers.




Basic Workflow

1. Subscription and Declaration: Subscribers initiate a Subscribe request and declare filtering conditions ($where) in the User Property.
2. Condition Parsing: The server parses and validates the effectiveness of the WHERE clause.
3. Message Matching: When a message is published, the server applies the filtering conditions of each subscriber to messages that match the topic.
4. Precise Delivery: Only messages that meet the conditions are delivered to subscribers.

SQL Filtering Syntax

WHERE clauses support extensive operators and functions for constructing flexible filtering conditions.

Supported Operators

Type
Operators
Example
Description
Comparison Operators
=, !=, >, >=, <, <=
payload.temp > 30
Comparing numerical values or strings
Logical Operators
AND, OR, NOT
temp > 25 AND hum < 70
Combining multiple conditions
Range Judgment
IN
clientid IN ('client1', 'client2')
Determine whether a field value is in a list.
Null Check
IS NULL
payload.location IS NULL
Determine whether the field is NULL.
Pattern Matching
LIKE
topic LIKE 'sensor/%/temp'
Simple Wildcard Matching
Conditional Expressions
CASE WHEN...THEN...ELSE...END
CASE WHEN qos > 0 THEN 'important' ELSE 'normal' END
Implement conditional logic

Supported Functions

Type
Function Sample
Description
String Functions
UPPER(), LOWER(), LENGTH()
Processing text data
Mathematical Functions
ABS()
Calculate the absolute value
Conditional Functions
COALESCE()
Return the first non-NULL value among the parameters.

Must-Knows

1. Each Subscribe request's User Properties can contain only one attribute with the Key $where. If multiple key-value pairs of $where→WHERE clause exist, only the first one takes effect.
2. For the user properties (User Property) in a message, if multiple Key-Value pairs with the same key exist, only the Value from the last occurrence is used in the filter expression calculation.
3. If a field referenced in the filter expression does not exist in the message properties, its value is considered NULL.
4. String literals should be represented using single quotes, for example: WHERE type = 'string-literal'

Examples

package com.tencent.tdmq.mqtt.quickstart.paho.v5.async;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

public class BasicQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
String serverUri = "tcp://mqtt-xxx.mqtt.tdmqcloud.com:1883";
String clientId = "deviceBasic";

String topic = "home/room/1";
String[] topicFilters = new String[] {"home/#"};
int[] qos = new int[] {1};

MqttAsyncClient client = new MqttAsyncClient(serverUri, clientId, new MemoryPersistence());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName("YOUR-USERNAME");
options.setPassword("YOUR-PASSWORD".getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setSessionExpiryInterval(TimeUnit.DAYS.toSeconds(1));

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s], properties=%s%n",
topic, message.getQos(), content, message.getProperties());
}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
}
});
client.connect(options).waitForCompletion();
try {
// Subscribe
MqttSubscription[] subscriptions = new MqttSubscription[topicFilters.length];
for (int i = 0; i < topicFilters.length; i++) {
subscriptions[i] = new MqttSubscription(topicFilters[i], qos[i]);
}
MqttProperties subscribeProperties = new MqttProperties();
List<UserProperty> userProperties = new ArrayList<>();
UserProperty userProperty = new UserProperty("$where", "where $QoS = 1 AND k1 = 'v1'");
userProperties.add(userProperty);
subscribeProperties.setUserProperties(userProperties);
client.subscribe(subscriptions, null, null, subscribeProperties).waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}


int total = 128;
for (int i = 0; i < total; i++) {
byte[] payload = new byte[4];
ByteBuffer buffer = ByteBuffer.wrap(payload);
buffer.putInt(i);
MqttMessage message = new MqttMessage(payload);
message.setQos(1);
MqttProperties properties = new MqttProperties();
properties.setContentType("application/json");
properties.setResponseTopic("response/topic");
message.setProperties(properties);
System.out.printf("Prepare to publish message %d%n", i);
// P2P topic format: {first-topic}/p2p/{target-client-id}
client.publish(topic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.MILLISECONDS.sleep(100);
}
TimeUnit.MINUTES.sleep(3);
client.disconnect();
}
}

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback