新功能发布记录
公告

说明 | 发送消息限流 | 消费消息限流 |
触发限流情景 | 所有连接该集群的发送客户端每秒最多可发送折算消息的总和为 500 条,发送速率达到限制后,超限的发送请求会失败。 | 所有连接该集群的消费客户端每秒最多可消费折算消息的总和为 500 条,消费速率达到限制后,消息的消费延迟会增加。 |
触发限流时 SDK 日志关键词 | Rate of message sending reaches limit, please take control or upgrade the resource specification。 | Rate of message receiving reaches limit, please take control or upgrade the resource specification。 |
触发限流时 SDK 重试机制 | 不同协议的 SDK 处理有差异: 5.x SDK 会根据指数退避策略进行重试发送,最大重试次数可在初始化 Producer 时自定义,默认值为 2 次;达到最大重试次数仍未成功的发送请求会抛出异常。 4.x SDK 直接抛出异常,不会进行重试。 | SDK 拉消息线程会自动退避重试。 |

import java.io.UnsupportedEncodingException;import java.nio.charset.StandardCharsets;import org.apache.rocketmq.acl.common.AclClientRPCHook;import org.apache.rocketmq.acl.common.SessionCredentials;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageClientIDSetter;import org.apache.rocketmq.logging.InternalLogger;import org.apache.rocketmq.logging.InternalLoggerFactory;public class ProducerExample {private static final InternalLogger log = InternalLoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {String nameserver = "Your_Nameserver";String accessKey = "Your_Access_Key";String secretKey = "Your_Secret_Key";String topicName = "Your_Topic_Name";String producerGroupName = "Your_Producer_Group_Name";// 实例化消息生产者 ProducerDefaultMQProducer producer = new DefaultMQProducer(producerGroupName, // 生产者组名字new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限, accessKey 和 secretKey 可以在控制台集群权限页面获取);// 设置 Nameserver 地址, 可以在控制台集群基本信息页面获取producer.setNamesrvAddr(nameserver);// 启动 Producer 实例producer.start();// 创建消息实例, 设置 topic 和消息内容Message message = new Message(topicName, "Your_Biz_Body".getBytes(StandardCharsets.UTF_8));// 最大尝试发送次数, 请根据业务情况设置final int maxAttempts = 3;// 重试间隔时间, 请根据业务情况设置final int retryIntervalMillis = 200;// 发送消息int attempt = 0;do {try {SendResult sendResult = producer.send(message);log.info("Send message successfully, {}", sendResult);break;} catch (Throwable t) {attempt++;if (attempt >= maxAttempts) {// 达到最大次数log.warn("Failed to send message finally, run out of attempt times, attempt={}, maxAttempts={}, msgId={}",attempt, maxAttempts, MessageClientIDSetter.getUniqID(message), t);// 记录发送失败的消息 (或记录到其他业务系统, 比如数据库等)log.warn(message.toString());break;}int waitMillis;if (t instanceof MQBrokerException && ((MQBrokerException) t).getResponseCode() == 215 /* FLOW_CONTROL */) {// 限流异常, 采用退避重试waitMillis = (int) Math.pow(2, attempt - 1) * retryIntervalMillis; // 重试间隔: 200ms, 400ms, ......} else {// 其他异常waitMillis = retryIntervalMillis;}log.warn("Failed to send message, will retry after {}ms, attempt={}, maxAttempts={}, msgId={}",waitMillis, attempt, maxAttempts, MessageClientIDSetter.getUniqID(message), t);try {Thread.sleep(waitMillis);} catch (InterruptedException ignore) {}}}while (true);producer.shutdown();}}
import java.io.IOException;import java.nio.charset.StandardCharsets;import org.apache.rocketmq.client.apis.ClientConfiguration;import org.apache.rocketmq.client.apis.ClientException;import org.apache.rocketmq.client.apis.ClientServiceProvider;import org.apache.rocketmq.client.apis.SessionCredentialsProvider;import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;import org.apache.rocketmq.client.apis.message.Message;import org.apache.rocketmq.client.apis.producer.Producer;import org.apache.rocketmq.client.apis.producer.SendReceipt;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ProducerExample {private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws ClientException, IOException {String nameserver = "Your_Nameserver";String accessKey = "Your_Access_Key";String secretKey = "Your_Secret_Key";String topicName = "Your_Topic_Name";// ACL权限, accessKey 和 secretKey 可以在控制台集群权限页面获取SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(nameserver) // 设置 NameServer 地址, 可以在控制台集群基本信息页面获取.setCredentialProvider(sessionCredentialsProvider).build();// 启动 Producer 实例ClientServiceProvider provider = ClientServiceProvider.loadService();Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration).setTopics(topicName) // 预声明消息发送的 topic, 建议设置.setMaxAttempts(3) // 最大尝试发送次数, 请根据业务情况设置.build();// 创建消息实例, 设置 topic 和消息内容byte[] body = "Your_Biz_Body".getBytes(StandardCharsets.UTF_8);final Message message = provider.newMessageBuilder().setTopic(topicName).setBody(body).build();try {final SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {log.warn("Failed to send message", t);// 记录发送失败的消息 (或记录到其他业务系统, 比如数据库等)log.warn(message.toString());}producer.close();}}

文档反馈