tencent cloud

Configuring Point-to-Point Subscription
Last updated:2026-01-30 14:59:41
Configuring Point-to-Point Subscription
Last updated: 2026-01-30 14:59:41

Scenario Description

In addition to the publish/subscribe (Pub/Sub) message model defined by the MQTT standard protocol, TDMQ for MQTT supports Point-to-Point (P2P) mode.
Notes:
The feature is currently in gradual rollout. If you need to use it, please contact us.

What Is P2P Messaging Pattern?

When you need to send a message to a specified consumer, you can use the peer-to-peer messaging pattern. Compared to the Pub/Sub messaging pattern, which mainly provides a 1:N or M:N scenario solution with one Publisher and multiple Subscribers, the P2P messaging pattern offers an efficient point-to-point communication solution.
When using the P2P pattern, the Publisher has clearly specified the target recipient information of the message, and the message only needs to be consumed by a designated single client. When the sender sends a message, it specifies the recipient by setting a Topic that complies with the naming rules. The recipient can consume the message without needing to subscribe in advance.
Using the P2P pattern can not only save the cost of recipient registration and subscription relationships but also reduce push delay.

Publishing a P2P Message

String firstTopic = ...;
String targetClientId = ...;
String topic = firstTopic + "/p2p/" + targetClientId;
MqttMessage message = ...;
mqttClient.publish(topic, message);

Subscribing to a P2P Message

The client receiving messages requires no subscription processing. The correctly initialized and connected target client to the cluster can receive P2P messages.

Notes on Using the Paho Golang SDK

The Paho Golang SDK performs matchAndDispatch based on the Topic of the received message.
// matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
// takes messages off the channel, matches them against the internal route list and calls the
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
var wg sync.WaitGroup
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel

stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
if order {
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
} else {
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
ackInChan = make(chan *PacketAndToken)
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
for {
select {
case a := <-ackInChan:
ackOutChan <- a
case <-stopAckCopy:
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
for {
select {
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
case <-goRoutinesDone:
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
return
}
}
}
}
}()
}

go func() { // Main go routine handling inbound messages
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
var handlers []MessageHandler
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
wg.Add(1)
go func() {
hd(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
sent = true
}
}
if !sent {
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
} else {
wg.Add(1)
go func() {
r.defaultHandler(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
} else {
DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
}
}
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
if order {
close(ackOutChan)
} else { // Ensure that nothing further will be written to ackOutChan before closing it
close(stopAckCopy)
<-ackCopyStopped
close(ackOutChan)
go func() {
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
close(goRoutinesDone)
}()
}
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackOutChan
}
Messages received may not match any topic-filter. To correctly handle such messages, need to configure defaultHandler: ClientOptions:SetDefaultPublishHandler(messagePubHandler), to ensure fallback logic behaves as expected.
package main

import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}

func sub(client mqtt.Client) {
topic := "home/room"
// Message handler per topic-filter
token := client.Subscribe(topic, 1, messagePubHandler)
result := token.Wait()
fmt.Printf("Subscribed to topic %s, %s", topic, result)
}

func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("home/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}

func main() {
// Acquire your instance access point from MQTT Console
var broker = "mqtt-xxx.mqtt.tencenttdmq.com"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("mqttGolangClient")
opts.SetUsername("YOUR-USERNAME")
opts.SetPassword("YOUR-PASSWORD")
// Need to configure defaultHandler to make P2P message fallback properly
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client)
publish(client)
time.Sleep(time.Minute * 3)
}

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback