新功能发布记录
公告
<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.7</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.7</version></dependency>
// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限);// 设置NameServer的地址,地址就是形如xxx.tencenttdmq.com:8080 这样的接入地址。producer.setNamesrvAddr(nameserver);// 启动Producer实例producer.start();for (int i = 0; i < 10; i++) {// 创建消息实例,设置topic和消息内容.Message msg = new Message(topic_name, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}
参数 | 说明 |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
nameserver | 集群接入地址,在控制台集群基本信息页面的接入信息模块获取。 ![]() |
topic_name | Topic 的名称,在控制台 Topic 管理页面复制。 ![]() |
SendResult [sendStatus=SEND_OK, msgId=0100017D1DC818B4AAC202F388CF0000, offsetMsgId=null, messageQueue=MessageQueue [topic=yourTopic, brokerName=broker-a, queueId=3], queueOffset=250]SendResult [sendStatus=SEND_OK, msgId=0100017D1DC818BD1CAC202F388CF0001, offsetMsgId=null, messageQueue=MessageQueue [topic=yourTopic, brokerName=broker-a, queueId=0], queueOffset=251]...SendResult [sendStatus=SEND_OK, msgId=0100017D1DC818B4AAC202F388CF0009, offsetMsgId=null, messageQueue=MessageQueue [topic=yourTopic, brokerName=broker-a, queueId=2], queueOffset=259]
// 实例化消费者DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限// 设置NameServer的地址pushConsumer.setNamesrvAddr(nameserver);// 订阅topicpushConsumer.subscribe(topic_name, "*");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();
参数 | 说明 |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
nameserver | 集群接入地址,在控制台集群基本信息页面的接入信息模块获取。 ![]() |
groupName | 消费者组名称,在控制台 Group 管理页面复制。 ![]() |
topic_name | Topic 的名称,在控制台 Topic 管理页面复制。 ![]() |
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=287, queueOffset=250, sysFlag=0, bornTimestamp=1698765432100, bornHost=/192.168.1.100:53902, storeTimestamp=1698765432200, storeHost=/192.168.1.200:10911, msgId=0100017D1DC818B4AAC202F388CF0000, commitLogOffset=156789, bodyLength=16, body=Hello RocketMQ 0, topic=yourTopic, properties={MIN_OFFSET=0, MAX_OFFSET=251, CONSUME_START_TIME=1698765432300, UNIQ_KEY=0100017D1DC818B4AAC202F388CF0000, CLUSTER=DefaultCluster}, tags=null]]ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=287, queueOffset=251, sysFlag=0, bornTimestamp=1698765432110, bornHost=/192.168.1.100:53902, storeTimestamp=1698765432210, storeHost=/192.168.1.200:10911, msgId=0100017D1DC818BD1CAC202F388CF0001, commitLogOffset=157045, bodyLength=16, body=Hello RocketMQ 1, topic=yourTopic, properties={MIN_OFFSET=0, MAX_OFFSET=252, CONSUME_START_TIME=1698765432310, UNIQ_KEY=0100017D1DC818BD1CAC202F388CF0001, CLUSTER=DefaultCluster}, tags=null]]
文档反馈