tencent cloud

消息队列 Pulsar 版

动态与公告
新功能发布记录
集群版本更新记录
产品公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 Pulsar 版
产品优势
应用场景
技术原理
产品系列
开源 Pulsar 版本支持说明
与开源 Pulsar 对比
高可用
配额与限制
基础概念
产品计费
计费概述
价格说明
计费示例
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
使用 SDK 收发普通消息
使用 SDK 收发高级特性消息
用户指南
使用流程指引
配置账号权限
新建集群
配置命名空间
配置 Topic
连接集群
管理集群
查询消息及轨迹
跨地域复制
查看监控和配置告警
实践教程
客户端使用实践
异常消费者隔离
限流机制说明
交易对账
消息幂等性
消息压缩
迁移指南
单写多读集群迁移方案
虚拟集群平滑迁移至专业集群
API 参考
API 概览
SDK 参考
SDK 概述
SDK 配置参数推荐
TCP 协议(Pulsar 社区版)
安全与合规
权限管理
删除保护
云 API 审计
常见问题
监控相关
客户端相关
服务协议
服务等级协议
TDMQ 政策
联系我们
词汇表

Spring Boot Starter 接入

PDF
聚焦模式
字号
最后更新时间: 2025-12-24 15:26:38

操作场景

本文以 Spring Boot Starter 接入为例介绍实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数

操作步骤

步骤1:添加依赖

在项目中引入 Pulsar Starter 相关依赖。
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>1.0.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.11</version>
</dependency>

步骤2:准备配置

在配置文件 application.yml 中添加 Pulsar 相关配置信息。
server:
port: 8081
pulsar:
# 命名空间名称
namespace: namespace_java
# 服务接入地址
service-url: http://pulsar-w7eognxxx.tdmq.ap-gz.public.tencenttdmq.com:8080
# 授权角色密钥
token-auth-value: eyJrZXlJZC......
# 集群id
tenant: pulsar-w7eognxxx
参数
说明
namespace
命名空间名称,在控制台 命名空间 管理页面中复制。
service-url
集群接入地址,可以在控制台 集群管理 页面查看并复制。

token-auth-value
角色密钥,在 角色管理 页面复制密钥列。

tenant
集群 ID,在控制台 集群管理 页面中获取。

步骤3:生产消息

在 ProducerConfiguration.java 中配置生产者
package com.tencent.cloud.tdmq.pulsar.config;

import io.github.majusko.pulsar.producer.ProducerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
* 生产者相关配置
* 1.topic要提前在控制台中完成创建
* 2.消息类型需要实现Serializable接口
* 3.如果一个topic不能绑定不同的数据类型
*/
@Configuration
public class ProducerConfiguration {

@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
// topic1 生产者
.addProducer("topic1")
// topic2 生产者
.addProducer("topic2");
说明:
Topic 名称需要填入完整路径,即“persistent://clusterid/namespace/topic”,clusterid/namespace/topic 的部分可以从控制台上 Topic 管理 页面直接复制。

编译并运行生产消息程序 MyProducer.java。
package com.tencent.cloud.tdmq.pulsar.service;

import io.github.majusko.pulsar.producer.PulsarTemplate;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;

@Service
public class MyProducer {

/**
* 1.发送消息的topic是在生产者配置中已经声明的topic
* 2.PulsarTemplate类型应于发送消息的类型一致
* 3.发送消息到指定topic时,消息类型需要与生产者工厂配置中的topic绑定的消息类型对应
*/

@Autowired
private PulsarTemplate<byte[]> defaultProducer;

public void syncSendMessage() throws PulsarClientException {
defaultProducer.send("topic1", "Hello pulsar client.".getBytes(StandardCharsets.UTF_8));
}

public void asyncSendMessage() {
String msg = "Hello pulsar client.";
CompletableFuture<MessageId> completableFuture = defaultProducer.sendAsync("topic1", msg.getBytes(StandardCharsets.UTF_8));
// 通过异步回调得知消息发送成功与否
completableFuture.whenComplete(((messageId, throwable) -> {
if( null != throwable ) {
System.out.println("delivery failed, value: " + msg );
// 此处可以添加延时重试的逻辑
} else {
System.out.println("delivered msg " + messageId + ", value:" + msg);
}
}));
}

/**
* 顺序消息需要使用顺序类型topic来完成,顺序类型的topic支撑全局顺序和局部顺序两种类型,根据实际情况选择合适的类型即可
*/
public void sendOrderMessage() throws PulsarClientException {
for (int i = 0; i < 5; i++) {
defaultProducer.send("topic2", ("Hello pulsar client, this is a order message" + i + ".").getBytes(StandardCharsets.UTF_8));
}
}
}

注意:
发送消息的 Topic 是在生产者配置中已经声明的 Topic。
PulsarTemplate 类型应与发送消息的类型一致。
发送消息到指定 Topic 时,消息类型需要与生产者工厂配置中的 Topic 绑定的消息类型对应。

步骤4:消费消息

编译并运行消费消息程序 MyConsumer.java。
package com.tencent.cloud.tdmq.pulsar.service;

import io.github.majusko.pulsar.annotation.PulsarConsumer;
import io.github.majusko.pulsar.constant.Serialization;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.stereotype.Service;

/**
* 消费者配置
*/
@Service
public class MyConsumer {

@PulsarConsumer(topic = "topic1", // 订阅topic名称
subscriptionName = "sub_topic1", // 订阅名称
serialization = Serialization.JSON, // 序列化方式
subscriptionType = SubscriptionType.Shared, // 订阅模式,默认为独占模式
consumerName = "firstTopicConsumer", // 消费者名称
maxRedeliverCount = 3, // 最大重试次数
deadLetterTopic = "sub_topic1-DLQ" // 死信topic名称
)
public void firstTopicConsume(byte[] msg) {
// TODO process your message
System.out.println("Received a new message. content: [" + new String(msg) + "]");
// 如果消费失败,请抛出异常,这样消息会进入重试队列,之后可以重新消费,直到达到最大重试次数之后,进入死信队列。前提是要创建重试和死信topic
}


/**
* 顺序类型的消息可借助顺序类型的topic来完成,支持全局顺序和局部顺序两种类型
*/
@PulsarConsumer(topic = "topic2", subscriptionName = "sub_topic2")
public void orderTopicConsumer(byte[] msg) {
// TODO process your message
System.out.println("Received a order message. content: [" + new String(msg) + "]");
}


/**
* 监听死信topic,处理死信消息
*/
@PulsarConsumer(topic = "sub_topic1-DLQ", subscriptionName = "dead_sub")
public void deadTopicConsumer(byte[] msg) {
// TODO process your message
System.out.println("Received a dead message. content: [" + new String(msg) + "]");
}
}

参数
说明
topic
Topic 名称需要填入完整路径,即persistent://clusterid/namespace/topicclusterid/namespace/topic 的部分可以从控制台上 Topic 管理 页面直接复制。

subscriptionName
subscriptionName 需要写入订阅名,可在消费者界面复制获取。

deadLetterTopic
若您新建订阅时,开启了自动创建重试&死信队列,系统会自动创建好一个重试队列和死信队列。此处可填写死信 Topic 名称, 需要填入完整路径,即 persistent://clusterid/namespace/topicclusterid/namespace/topic的部分可以从控制台上 Topic 管理页面直接复制。

说明:
以上是基于 Springboot Starter 方式对 Pulsar 简单使用的配置。详细使用可参见 DemoStarter GitHubStarter Gitee

步骤5:查询消息

登录控制台,进入 消息查询 页面,可查看 Demo 运行后的消息轨迹。

消息轨迹如下:

说明:
以上是基于 Springboot Starter 方式对 Pulsar 简单使用的配置。详细使用可参见 DemoStarter 文档

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈