tencent cloud

消息队列 CMQ 版

动态与公告
产品动态
公告
产品简介
产品概述
产品功能
产品优势
应用场景
使用限制
基本概念
购买指南
计费概述
购买方式
价格总览
欠费说明
退费说明
快速入门
队列模型快速入门
主题模型快速入门
操作指南
队列服务
主题订阅
访问管理 CAM
标签管理
配置告警
消息查询与轨迹
实践教程
消息去重
选择 Push 还是 Pull
案例分享
在线图片处理案例
起点文学网案例
开发指南
HTTP Endpoint 订阅
通用参考
API 文档
API 列表
CMQ API2.0 切换至 TDMQ CMQ 版 API3.0指引
SDK 文档
HTTP 数据流 SDK
HTTP 控制流 SDK
SDK 参数配置说明
常见问题
功能类
操作类
服务等级协议
联系我们
词汇表

Python SDK

PDF
聚焦模式
字号
最后更新时间: 2024-01-03 10:20:36

操作步骤

本文以 Python SDK 为例介绍客户端接入 TDMQ CMQ 版服务并收发消息的操作步骤。

前提条件

队列模型

操作步骤

1. 在控制台创建符合需求的队列,参见 创建队列服务
说明:
创建消息队列可在控制台手动创建,或通过云 API 进行创建,使用云 API 需要安装相关 SDK,SDK 安装可参见 Python SDK 安装。
shell
python
pip install --upgrade tencentcloud-sdk-python

# api认证信息
cred = credential.Credential(SecretId, SecretKey)
httpProfile = HttpProfile()
httpProfile.endpoint = NameServerAddress

clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
# 创建tdmq客户端
client = tdmq_client.TdmqClient(cred, "ap-guangzhou", clientProfile)

# 创建cmq队列请求参数
req = models.CreateCmqQueueRequest()
params = {
"QueueName": "queue_api",
# 下面是死信队列相关配置
"DeadLetterQueueName": "dead_queue_api", # 死信队列,该消息队列要先创建
"Policy": 0, # 0为消息被多次消费未删除,1为Time-To-Live过期
"MaxReceiveCount": 3 # 最大接收次数 1-1000
}
req.from_json_string(json.dumps(params))

# 创建cmq消息队列
resp = client.CreateCmqQueue(req)

参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API请求地址处复制。



SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API密钥管理页面复制。



2. 在项目中引入 CMQ 相关文件,需要根据使用的 Python 版本选择分支,默认为 Python2 SDK,您可切换至 Python3 分支中查看 Python3 SDK。
3. 发送消息。
import os
import sys

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/..")

import logging
from cmq.account import Account
from cmq.queue import Message
from cmq.cmq_exception import CMQExceptionBase

# 腾讯云账户 secretId、secretKey, 此处还需注意密钥对的保密
# 密钥可前往https://console.tencentcloud.com/cam/capi网站进行获取
secretId = 'AKIDSiiRtxxxx'
secretKey = 'GGzSeaM5xxxx'
# CMQ的服务调用地址
nameServerAddress = 'https://cmq-gz.public.tencenttdmq.com'

# 初始化 my_account, my_queue
# Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象
my_account = Account(nameServerAddress, secretId, secretKey, debug=True)
my_account.set_log_level(logging.DEBUG)
# 消息队列名称
queue_name = sys.argv[1] if len(sys.argv) > 1 else "python_queue"
my_queue = my_account.get_queue(queue_name)

try:
# 消息内容
msg_body = "I am test message."
msg = Message(msg_body)
# 发送消息
re_msg = my_queue.send_message(msg)
# 发送结果
print("Send Message Succeed! MessageBody:%s MessageID:%s" % (msg_body, re_msg.msgId))
except CMQExceptionBase as e:
print("Send Message Fail! Exception:%s\\n" % e)

参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API请求地址处复制。



SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API密钥管理页面复制。



queue_name
队列名称,在 TDMQ CMQ 版控制台 的队列服务列表页面获取。
4. 消费消息。
import os
import sys

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/..")

import logging
from cmq.account import Account
from cmq.cmq_exception import CMQExceptionBase

# 腾讯云账户 secretId、secretKey, 此处还需注意密钥对的保密
# 密钥可前往https://console.tencentcloud.com/cam/capi网站进行获取
secretId = 'AKIDSiiRtxxxx'
secretKey = 'GGzSeaM5xxxx'
# CMQ的服务调用地址
nameServerAddress = 'https://cmq-gz.public.tencenttdmq.com'

# 初始化 my_account, my_queue
# Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象
my_account = Account(nameServerAddress, secretId, secretKey, debug=True)
my_account.set_log_level(logging.DEBUG)
queue_name = sys.argv[1] if len(sys.argv) > 1 else "python_queue"
my_queue = my_account.get_queue(queue_name)

try:
wait_seconds = 3
# 获取消息
recv_msg = my_queue.receive_message(wait_seconds)
# 具体业务
print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
recv_msg.receiptHandle, recv_msg.msgBody, recv_msg.msgId))
# 消费成功,删除消息
my_queue.delete_message(recv_msg.receiptHandle)
except CMQExceptionBase as e:
print("Receive Message Fail! Exception:%s\\n" % e)

参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。



SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。



queue
队列名称,在 TDMQ CMQ 版控制台 的队列服务列表页面获取。

主题模型

操作步骤

1. 准备所需资源,创建主题订阅和订阅者。
1.1 创建主题订阅。可通过控制台手动创建,也可以通过云 API 进行创建,使用云 API 需要安装相关 SDK,SDK 安装可参见 Python SDK 安装。
# api认证信息
cred = credential.Credential(SecretId, SecretKey)
httpProfile = HttpProfile()
httpProfile.endpoint = NameServerAddress

clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = tdmq_client.TdmqClient(cred, "ap-guangzhou", clientProfile)

req = models.CreateCmqTopicRequest()
params = {
"TopicName": "topic_api", # 主题名字,在单个地域同一账号下唯一
"FilterType": 1, # 用于指定主题的消息匹配策略。1:表示标签匹配策略;2:表示路由匹配策略
"MsgRetentionSeconds": 86400 # 消息保存时间。取值范围60 - 86400 s(即1分钟 - 1天)
}
req.from_json_string(json.dumps(params))

# 创建topic
resp = client.CreateCmqTopic(req)

参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。



SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。



1.2 创建订阅者。可通过控制台进行手动创建,也可以通过云 API 进行创建,使用云 API 需要安装相关 SDK,SDK 安装可参见 Python SDK 安装。
# api认证信息
cred = credential.Credential(SecretId, SecretKey)
httpProfile = HttpProfile()
httpProfile.endpoint = NameServerAddress

clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = tdmq_client.TdmqClient(cred, "ap-guangzhou", clientProfile)

req = models.CreateCmqSubscribeRequest()
params = {
"TopicName": "topic_api", # 创建订阅的topic名称
"SubscriptionName": "sub", # 订阅名称
"Protocol": "queue", # 订阅的协议,目前支持两种协议:http、queue。使用http协议,用户需自己搭建接受消息的web server。使用queue,消息会自动推送到CMQ queue,用户可以并发地拉取消息。
"Endpoint": "topic_queue_api", # 接收通知的Endpoint,根据协议Protocol区分:对于http,Endpoint必须以“http://”开头,host可以是域名或IP;对于Queue,则填QueueName。
"NotifyStrategy": "BACKOFF_RETRY", # CMQ推送服务器的重试策略。取值有:1)BACKOFF_RETRY,退避重试。;2)EXPONENTIAL_DECAY_RETRY,指数衰退重试。
"FilterTag": ["TAG"], # 消息标签(用于消息过滤)。标签数量不能超过5个
# "BindingKey": ["a.b.c"], # BindingKey数量不超过5个, 每个BindingKey长度不超过64字节,该字段表示订阅接收消息的过滤策略
"NotifyContentFormat": "SIMPLIFIED" # 推送内容的格式。取值:1)JSON;2)SIMPLIFIED,即raw格式。如果Protocol是queue,则取值必须为SIMPLIFIED。如果Protocol是http,两个值均可以,默认值是JSON。
}
req.from_json_string(json.dumps(params))

# 创建订阅
resp = client.CreateCmqSubscribe(req)

注意:
BindingKey 与 FilterTag 要根据所订阅topic类型进行设置,否则无效。
参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。



SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。



2. 在项目中引入 CMQ 相关文件,需要根据使用的 Python 版本选择分支,默认为 Python2 SDK,您可切换至 Python3 分支中查看 Python3 SDK。
3. 创建 my_topic,用来发布消息。
import os
import sys

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/..")

import logging
from cmq.account import Account
from cmq.cmq_exception import *
from cmq.topic import *

# 腾讯云账户 secretId、secretKey, 此处还需注意密钥对的保密
# 密钥可前往https://console.tencentcloud.com/cam/capi网站进行获取
secretId = 'AKIDSiiRtxxxx'
secretKey = 'GGzSeaM5xxxx'
# CMQ的服务调用地址
nameServerAddress = 'https://cmq-gz.public.tencenttdmq.com'

try:
# 初始化 my_account
# Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象
my_account = Account(nameServerAddress, secretId, secretKey, debug=True)
my_account.set_log_level(logging.DEBUG)
# topic主题名称
topic_name = sys.argv[1] if len(sys.argv) > 1 else "python_topic_route"
my_topic = my_account.get_topic(topic_name)
except CMQExceptionBase as e:
print("Exception:%s\\n" % e)

参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。



SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。



topic_name
主题订阅名称,在 TDMQ CMQ 版控制台 的主题订阅列表页面获取。
4. 发送 TAG 类型消息。
# 消息tag
tags = ["TAG", "TAG1", "TAG2"]
for tag in tags:
# 发送tag消息
message = Message("this is a test TAG message. TAG:" + tag, [tag])
re_msg = my_topic.publish_message(message)
# 发送结果
print("Send Message Succeed! MessageBody:%s MessageID:%s" % (message.msgBody, re_msg.msgId))

5. 发送 route 消息。
# 消息route信息
routes = ["a.b.c", "a.b.x", "a.c.d", "x.y.z", "x.y.c"]
for route in routes:
message = Message("this is a test route message. Route:" + route)
# 发送route消息
re_msg = my_topic.publish_message(message, route)
# 发送结果
print("Send Message Succeed! MessageBody:%s MessageID:%s" % (message.msgBody, re_msg.msgId))


6. 消费者消费订阅者订阅的消息队列即可。
说明:
以上是 CMQ 两种模型下的生产和消费方式的简单介绍,更多使用可参见 DemoCMQ 代码仓库


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈