tencent cloud

消息队列 RocketMQ 版

动态与公告
新功能发布记录
公告
产品简介
产品概述
什么是消息队列 RocketMQ 版
产品优势
应用场景
产品系列
开源对比
高可用
使用限制
开服地域
基本概念
产品计费
计费概述
价格说明
计费示例
切换集群计费模式(5.x)
续费说明
查看消费明细
退费说明
欠费说明
快速入门
快速入门概述
准备工作
步骤1:创建 RocketMQ 资源
步骤2:使用 SDK 收发消息(推荐)
步骤2:运行 RocketMQ 客户端(可选)
步骤3:查询消息
步骤4:销毁资源
用户指南
使用流程指引
配置账号权限
新建集群
命名空间管理
配置 Topic
配置 Group
连接集群
管理消息
管理集群
查看监控和配置告警
跨集群复制消息
实践教程
RocketMQ 常见概念命名规范
RocketMQ 客户端实践
RocketMQ 性能压测和容量评估
使用社区版 HTTP SDK 接入
客户端风险说明和更新指南
关于 RocketMQ 4.x 集群角色(Role)相关云 API 迁移指引
迁移指南
有感迁移
无感迁移
开发指南
消息类型
消息过滤
消息重试
POP 消费模式(5.x)
集群消费与广播消费
订阅关系一致性
限流
API 参考(5.x)
History
API Category
Making API Requests
Topic APIs
Consumer Group APIs
Message APIs
Role Authentication APIs
Hitless Migration APIs
Cloud Migration APIs
Cluster APIs
Data Types
Error Codes
API 参考(4.x)
SDK 参考
SDK 概述
5.x SDK
4.x SDK
安全与合规
权限管理
云 API 审计
删除保护
常见问题
4.x 实例常见问题
服务协议
服务等级协议
联系我们

顺序消息

PDF
聚焦模式
字号
最后更新时间: 2025-07-23 14:17:38
顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。
顺序消息适用于对消息发送和消费顺序有严格要求的情况。

使用场景

顺序消息和普通消息的对比如下:
消息类型
消费顺序
性能
适用场景
普通消息
无顺序
适用于对吞吐量要求高,且对生产和消费顺序无要求
顺序消息
指定的 Topic 内的消息遵循先入先出(FIFO)规则
一般
吞吐量要求一般,但是要求特定的 Topic 严格地按照 FIFO 原则进行消息发布和消费的场景
对应到具体的业务场景,顺序消息可以被用在以下场景中:
订单创建场景:在一些电商系统中,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息必须严格按照先后顺序来进行生产或者消费,否则消费中传递订单状态会发生紊乱,影响业务的正常进行。因此,该订单的消息必须按照一定的顺序在客户端和消息队列中进行生产和消费,同时消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果。
日志同步场景:在有序事件处理或者数据实时增量同步的场景中,顺序消息也能发挥较大的作用,如同步 MySQL 的 binlog 日志时,需要保证数据库的操作是有顺序的。
金融场景:在一些撮合交易的场景下,例如某些证券交易,在价格相同的情况下,先出价者优先处理,则需要按照FIFO的方式生产和消费顺序消息。

实现原理

在 RocketMQ 中支持顺序消息的原理如下图所示。我们可以按照某一个标准对消息进行分区(例如图中的ShardingKey),同一个ShardingKey 的消息会被分配到同一个队列中,并按照顺序被消费。



顺序消息的代码如下所示:
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

System.out.printf("%s%n", sendResult);
}

producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
这里的区别主要是调用了 SendResult send(Message msg, MessageQueueSelector selector, Object arg) 方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。
MessageQueueSelector 的接口如下:
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
其中 mqs 是可以发送的队列,msg 是消息,arg 是上述 send 接口中传入的 Object 对象,返回的是该消息需要发送到的队列。上述例子里,是以 orderId 作为分区分类标准,对所有队列个数取余,来对将相同 orderId 的消息发送到同一个队列中。
生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。
注意:
为了保证消息的高可用,目前TDMQ RocketMQ版不支持单队列的 “全局顺序消息”(已经创建了全局顺序消息的用户可以正常使用);如果您想保证全局的顺序性,您可以通过使用一致的 ShardingKey 来实现。

消费顺序消息

顺序消费代码如下:
/**
* Description: 顺序消费者
*/
public class OrderConsumer {
/**
* topic名称
*/
private static final String TOPIC_NAME = "order_topic";

/**
* 消费者组名称
*/
private static final String GROUP_NAME = "group2";
public static void main(String[] args) throws Exception {
// 创建消息消费者
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_NAME,
new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey")),
new AllocateMessageQueueAveragely(), true, null);
// 设置NameServer的地址
consumer.setNamesrvAddr("rmq-xxx.rocketmq.xxxtencenttdmq.com:8080");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅topic中的所有的信息
consumer.subscribe(TOPIC_NAME, "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", msgId=" + msg.getMsgId() + ", content:" + new String(msg.getBody()));
}
try {
// 模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈