tencent cloud

Feedback

Subscription Mode

Last updated: 2024-01-03 14:12:49
    In order to meet the needs of different use cases, TDMQ for Pulsar supports four subscription modes: exclusive, shared, failover, and key_shared.
    
    
    

    Exclusive Mode

    Exclusive mode (default): A subscription can only be associated with one consumer. Only this consumer can receive all messages in the topic, and if it fails, consumption will stop.
    In the exclusive subscription mode, only one consumer in a subscription can consume messages in the topic. If multiple consumers subscribe to the topic, an error will be reported. This mode is suitable for globally sequential consumption scenarios.
    
    
    
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("sub_topic1")
    // Declare the exclusive mode to be the consumption mode
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe();
    If multiple consumers are started, an error will be reported.
    
    

    Shared Mode

    Messages are distributed to different consumers through a customizable round robin mechanism, with each message going to only one consumer. When a consumer is disconnected, any messages delivered to it but not acknowledged will be redistributed to other active consumers.
    
    
    
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("sub_topic1")
    // Declare the shared mode to be the consumption mode
    .subscriptionType(SubscriptionType.Shared)
    .subscribe();
    There can be multiple consumers in the shared mode.
    
    

    Failover Mode

    In this mode, when there are multiple consumers, they will be sorted lexicographically, and the first consumer is initialized to be the only one who can receive messages. When the first consumer is disconnected, all the unacknowledged and upcoming messages will be distributed to the next consumer in the queue.
    
    
    
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("sub_topic1")
    // Declare the failover mode to be the consumption mode
    .subscriptionType(SubscriptionType.Failover)
    .subscribe();
    There can be multiple consumers in the failover mode.
    
    

    Key_Shared mMode

    If there are multiple consumers, messages will be distributed by key, and messages with the same key will only be distributed to the same consumer.
    
    
    
    Note:
    The key_shared mode has certain use limits. It is continuously iterated in the community due to its complex engineering implementation. Therefore, it doesn't have the same level of stability as exclusive, failover, and shared modes. We recommend that you first select the other three modes if they can meet your business needs.
    Pro clusters can guarantee the sequential delivery of messages with the same key, while virtual clusters cannot.

    Suggestions for the key_shared mode

    When to use the key_shared mode

    Choose the shared mode for general production/consumption scenarios.
    If you want messages with the same key to be distributed to the same consumer, you cannot use the shared mode. You have two options:
    Choose the key_shared mode.
    Use a multi-partition topic + failover mode.

    Where to use the key_shared mode

    There are a lot of message keys with even message distribution.
    Consumption is fast with no message heap.
    If the above two conditions cannot be met in the production process, we recommend that you use the combo of multi-partition topic and failover mode.

    Sample code

    Sample key_shared subscription

    By default, Pulsar enables the batch feature when producing messages, and batch messages are parsed on the consumer side. Therefore, a batch of messages on the broker side is treated as one entry. Since messages with different keys may be packaged into the same batch, the key_shared mode will become ineffective in this case because it achieves sequential subscription based on the same message key. There are two ways to avoid this when creating a producer:
    1. Disable the batch feature.
    // Construct a producer
    Producer<byte[]> producer pulsarClient.newProducer()
    .topic(topic)
    .enableBatching(false)
    .create();
    // Set the key when sending messages
    MessageId msgId = producer.newMessage()
    // Message content
    .value(value.getBytes(StandardCharsets.UTF_8))
    // Set the key here. Messages with the same key will only be distributed to the same consumer.
    .key("youKey1")
    .send();
    2. Use the key_based batch type.
    // Construct a producer
    Producer<byte[]> producer = pulsarClient.newProducer()
    .topic(topic)
    .enableBatching(true)
    .batcherBuilder(BatcherBuilder.KEY_BASED)
    .create();
    // Set the key when sending messages
    MessageId msgId = producer.newMessage()
    // Message content
    .value(value.getBytes(StandardCharsets.UTF_8))
    // Set the key here. Messages with the same key will only be distributed to the same consumer.
    .key("youKey1")
    .send();
    Sample code for the consumer:
    // Construct a consumer Consumer<byte[]> consumer = pulsarClient.newConsumer() // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page. .topic("persistent://pulsar-xxx/sdk_java/topic1") // You need to create a subscription on the topic details page in the console and enter the subscription name here .subscriptionName("sub_topic1") // Declare the key_shared mode to be the consumption mode .subscriptionType(SubscriptionType.Key_Shared) .subscribe();
    There can be multiple consumers in the key_shared mode.
    
    

    Sample "multi-partition topic + failover" subscription

    Note:
    In this mode, each partition will be assigned to only one consumer instance at a time. When there are more consumers than partitions, the excessive consumers cannot consume messages. This problem can be solved by adding more partitions.
    Try to ensure an even key distribution when designing keys.
    The delayed message is not supported in failover mode.
    1. Sample code for the producer.
    // Construct a producer
    Producer<byte[]> producer pulsarClient.newProducer()
    .topic(topic)
    .enableBatching(false) // Disable the batch feature
    .create();
    // Set the key when sending messages
    MessageId msgId = producer.newMessage()
    // Message content
    .value(value.getBytes(StandardCharsets.UTF_8))
    // Set the key here. Messages with the same key will be sent to the same partition.
    .key("youKey1")
    .send();
    2. Sample code for the consumer.
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("sub_topic1")
    // Declare the failover mode to be the consumption mode
    .subscriptionType(SubscriptionType.Failover)
    .subscribe();

    Enabling sequence guarantee

    TDMQ for Pulsar 2.9.2 clusters support the sequential message delivery by key. To enable the sequence guarantee feature, you need to specify keySharedPolicy when creating the consumer instance.
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("sub_topic1")
    // Declare the key_shared mode to be the consumption mode
    .subscriptionType(SubscriptionType.Key_Shared)
    // Set to require sequence
    .keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
    .subscribe();
    Note:
    The sequence guarantee feature is not supported for clusters on v2.7.2, which may lead to message push congestion and subsequent consumption failure.
    If the sequence guarantee feature is enabled, the consumption may slow down and messages may be heaped after the consumer is restarted. This is because the feature requires the restarted consumer to consume messages sequentially, starting with earlier received messages (with all the consumption acknowledged) and moving on to any later received messages.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support