tencent cloud

消息队列 RocketMQ 版

动态与公告
新功能发布记录
公告
产品简介
产品概述
什么是消息队列 RocketMQ 版
产品优势
应用场景
产品系列
开源对比
高可用
使用限制
开服地域
基本概念
产品计费
计费概述
价格说明
计费示例
切换集群计费模式(5.x)
续费说明
查看消费明细
退费说明
欠费说明
快速入门
快速入门概述
准备工作
步骤1:创建 RocketMQ 资源
步骤2:使用 SDK 收发消息(推荐)
步骤2:运行 RocketMQ 客户端(可选)
步骤3:查询消息
步骤4:销毁资源
用户指南
使用流程指引
配置账号权限
新建集群
命名空间管理
配置 Topic
配置 Group
连接集群
管理消息
管理集群
查看监控和配置告警
跨集群复制消息
实践教程
RocketMQ 常见概念命名规范
RocketMQ 客户端实践
RocketMQ 性能压测和容量评估
使用社区版 HTTP SDK 接入
客户端风险说明和更新指南
关于 RocketMQ 4.x 集群角色(Role)相关云 API 迁移指引
迁移指南
有感迁移
无感迁移
开发指南
消息类型
消息过滤
消息重试
POP 消费模式(5.x)
集群消费与广播消费
订阅关系一致性
限流
API 参考(5.x)
History
API Category
Making API Requests
Topic APIs
Consumer Group APIs
Message APIs
Role Authentication APIs
Hitless Migration APIs
Cloud Migration APIs
Cluster APIs
Data Types
Error Codes
API 参考(4.x)
SDK 参考
SDK 概述
5.x SDK
4.x SDK
安全与合规
权限管理
云 API 审计
删除保护
常见问题
4.x 实例常见问题
服务协议
服务等级协议
联系我们

发送与接收普通消息

PDF
聚焦模式
字号
最后更新时间: 2026-01-23 17:02:15
TDMQ RocketMQ 版兼容了社区版 HTTP SDK 的接入,如果您此前使用的客户端使用了社区版 HTTP SDK,您在切换到 TDMQ RocketMQ 版后,您无需在客户端进行任何代码改造。

操作场景

如果当前您已使用了 HTTP 协议进行消息的收发,在您的客户端引入开源 HTTP SDK 后,TDMQ RocketMQ 版支持用户通过内网或公网使用 HTTP 协议接入。
本文通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
注意:
暂不支持通过使用 HTTP 协议实现事务消息。
如果您使用的是 4.x 集群,在创建 Group 消费组时需要设置协议类型(TCP 或者 HTTP,详情请参见 创建 Group ),因此,同一个 Group(消费组)不支持 TCP 和 HTTP 客户端同时消费。

前提条件

安装 PHP。
更多示例可以参见开源社区的 Demo 示例

重试机制

HTTP 采用固定重试间隔的机制:
重试间隔
最大重试次数
5分钟
可通过修改消费组配置实现自定义最大重试次数,默认 16 次。
说明:
客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
重试间隔到期后客户端仍未 ACK,客户端会重新消费到这条消息。
每次消费的消息句柄只在重试间隔内有效,过期无效。

操作步骤

步骤1:安装 PHP 依赖库

在 PHP 项目中引入相关依赖:
{
"require": {
"aliyunmq/mq-http-sdk": ">=1.0.4"
}
}

步骤2:生产消息

创建消息生产者

private $client;
private $producer;
public function __construct()
{
// 获取 client
$this->client = new MQClient(
endpoint,
accessKey,
secretKey
);

$topic = topicName;
$instanceId = namespace;

// 获取 producer
$this->producer = $this->client->getProducer($instanceId, $topic);
}
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
endpoint
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
namespace
对于专享版,命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群(rocketmq-xxx)或者5.x集群(rmq-xxx),此处填写集群的 ID。

topicName
Topic 的名称,在控制台 topic 页面复制。

发送消息

public function run()
{
try
{
for ($i=0; $i<8; $i++)
{
$publishMessage = new TopicMessage(
"hello mq!"
);
// 设置属性
$publishMessage->putProperty("a", $i);
$result = $this->producer->publishMessage($publishMessage);
print "Send success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\\n";
}
} catch (\\Exception $e) {
print_r($e->getMessage() . "\\n");
}
}

步骤3:消费消息

创建消费者

private $client;
private $consumer;

public function __construct()
{
// 获取 client
$this->client = new MQClient(
endpoint,
accessKey,
secretKey
);

$topic = topicName;
$groupId = groupName;
$instanceId = namespace;

// 获取consumer
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
说明:
以下参数需登录 TDMQ RocketMQ 版控制台获取。
参数
说明
endpoint
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
namespace
对于专享版,命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群(rocketmq-xxx)或者5.x集群(rmq-xxx),此处填写集群的 ID。

topicName
Topic 的名称,在控制台 Topic 页面复制。
groupName
消费组名称,在控制台 Group 管理页面复制。


订阅消息

public function run()
{
while (True) {
try {
// 长轮询消费消息
// 长轮询表示如果 topic 没有消息则请求会在服务端等待,如果有消息可以消费则立即返回
// 如果对消费延迟比较敏感,强烈建议并发拉取消息
$messages = $this->consumer->consumeMessage(
batchSize,
waitSeconds
);
} catch (\\Exception $e) {
if ($e instanceof MQ\\Exception\\MessageNotExistException) {
printf("No message, contine long polling!RequestId:%s\\n", $e->getRequestId());
continue;
}

print_r($e->getMessage() . "\\n");

sleep(1);
continue;
}

print "consume finish, messages:\\n";

$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \\nPublishTime:%d, FirstConsumeTime:%d, \\nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\\n",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
// 处理业务
}

print_r($receiptHandles);
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\\Exception $e) {
if ($e instanceof MQ\\Exception\\AckMessageException) {
printf("Ack Error, RequestId:%s\\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
}
}
}
print "ack finish\\n";
}
}
参数
说明
batchSize
一次拉取的消息条数,支持最多16条。
waitSeconds
一次拉取的轮询等待时间,支持最长30秒。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈