新功能发布记录
公告
重试间隔 | 最大重试次数 |
5分钟 | 可通过修改消费组配置实现自定义最大重试次数,默认 16 次。 |
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ProducerMQProducer producer = mqClient.getProducer(namespace, topicName);
参数 | 说明 |
endpoint | 集群接入地址,控制台集群基本信息页面的接入信息模块获取。 ![]() |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
namespace | 命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。 ![]() |
topicName | Topic 的名称,在控制台 topic 页面复制。 |
try {for (int i = 0; i < 10; i++) {TopicMessage pubMsg;pubMsg = new TopicMessage(("Hello RocketMQ " + i).getBytes(),"TAG");// 设置分区顺序消息的 ShardingKeypubMsg.setShardingKey(i % 3);TopicMessage pubResultMsg = producer.publishMessage(pubMsg);System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());}} catch (Throwable e) {System.out.println("Send mq message failed.");e.printStackTrace();}
参数 | 说明 |
TAG | 设置消息的 TAG。 |
ShardingKey | 顺序消息的分区字段,相同 ShardingKey 的消息会发送到同一个分区。 |
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ConsumerMQConsumer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
参数 | 说明 |
endpoint | 集群接入地址,控制台集群基本信息页面的接入信息模块获取。 ![]() |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
namespace | 命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。 ![]() |
topicName | Topic 的名称,在控制台 Topic 页面复制。 |
groupName | 消费组名称,在控制台 Group 管理页面复制。 ![]() |
do {List<Message> messages = null;try {// 长轮询顺序消费消息, 拿到的消息可能是多个分区的, 一个分区的内的消息一定是顺序的// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息// 如果对消费延迟比较敏感,强烈建议使用多线程并发拉取消息messages = consumer.consumeMessageOrderly(Integer.parseInt(batchSize),Integer.parseInt(waitSeconds));} catch (Throwable e) {e.printStackTrace();}if (messages == null || messages.isEmpty()) {System.out.println(Thread.currentThread().getName() + ": no new message, continue!");continue;}for (Message message : messages) {System.out.println("Receive message: " + message);}{List<String> handles = new ArrayList<String>();for (Message message : messages) {handles.add(message.getReceiptHandle());}try {consumer.ackMessage(handles);} catch (Throwable e) {if (e instanceof AckMessageException) {AckMessageException errors = (AckMessageException) e;System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");if (errors.getErrorMessages() != null) {for (String errorHandle :errors.getErrorMessages().keySet()) {System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());}}continue;}e.printStackTrace();}}} while (true);
参数 | 说明 |
batchSize | 一次拉取的消息条数,支持最多16条。 |
waitSeconds | 一次拉取的轮询等待时间,支持最长30秒。 |
文档反馈