tencent cloud

消息队列 RocketMQ 版

发送与接收顺序消息

PDF
聚焦模式
字号
最后更新时间: 2026-01-23 17:02:15
TDMQ RocketMQ 版兼容了社区版 HTTP SDK 的接入,如果您此前使用的客户端使用了社区版 HTTP SDK,您在切换到 TDMQ RocketMQ 版后,您无需在客户端进行任何代码改造。

操作场景

如果当前您已使用了 HTTP 协议进行消息的收发,在您的客户端引入开源 HTTP SDK 后,TDMQ RocketMQ 版支持用户通过内网或公网使用 HTTP 协议接入。
本文以调用 Java SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
注意:
暂不支持使用 HTTP 协议实现事务消息。
如果您使用的是 4.x 集群,在创建 Group 消费组时需要设置协议类型(TCP 或者 HTTP,详情请参见 Group 管理 ),因此,同一个 Group(消费组)不支持 TCP 和 HTTP 客户端同时消费。

前提条件

通过 Maven 方式引入依赖,在 pom.xml 文件中添加对应语言的 SDK 依赖。

重试机制

HTTP 采用固定重试间隔的机制:
重试间隔
最大重试次数
5分钟
可通过修改消费组配置实现自定义最大重试次数,默认 16 次。
说明:
客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
重试间隔到期后客户端仍未 ACK,客户端会重新消费这条消息。
每次消费的消息句柄只在重试间隔内有效,过期无效。

操作步骤

步骤1:安装 Java 依赖库

在 Java 项目中引入社区版 HTTP SDK 依赖。

步骤2:生产消息

创建消息生产者

// 获取Client
MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);

// 获取Topic的Producer
MQProducer producer = mqClient.getProducer(namespace, topicName);
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
endpoint
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
namespace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。

topicName
Topic 的名称,在控制台 topic 页面复制。

发送顺序消息

try {
for (int i = 0; i < 10; i++) {
TopicMessage pubMsg;
pubMsg = new TopicMessage(
("Hello RocketMQ " + i).getBytes(),
"TAG"
);
// 设置分区顺序消息的 ShardingKey
pubMsg.setShardingKey(i % 3);
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());
}
} catch (Throwable e) {
System.out.println("Send mq message failed.");
e.printStackTrace();
}
参数
说明
TAG
设置消息的 TAG。
ShardingKey
顺序消息的分区字段,相同 ShardingKey 的消息会发送到同一个分区。

步骤3:消费消息

创建消费者

// 获取Client
MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);

// 获取Topic的Consumer
MQConsumer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
说明:
以下参数需登录 TDMQ RocketMQ 版控制台获取。
参数
说明
endpoint
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
namespace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。

topicName
Topic 的名称,在控制台 Topic 页面复制。
groupName
消费组名称,在控制台 Group 管理页面复制。


订阅消息

do {
List<Message> messages = null;

try {
// 长轮询顺序消费消息, 拿到的消息可能是多个分区的, 一个分区的内的消息一定是顺序的
// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息
// 如果对消费延迟比较敏感,强烈建议使用多线程并发拉取消息
messages = consumer.consumeMessageOrderly(
Integer.parseInt(batchSize),
Integer.parseInt(waitSeconds)
);
} catch (Throwable e) {
e.printStackTrace();
}
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}

for (Message message : messages) {
System.out.println("Receive message: " + message);
}

{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}

try {
consumer.ackMessage(handles);
} catch (Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
参数
说明
batchSize
一次拉取的消息条数,支持最多16条。
waitSeconds
一次拉取的轮询等待时间,支持最长30秒。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈