新功能发布记录
公告




<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.6</version></dependency>
public class ProducerTransactionMessageDemo {private static final Logger log = LoggerFactory.getLogger(ProducerTransactionMessageDemo.class);private static boolean executeLocalTransaction() {// 模拟本地事务(如数据库插入操作),这里假设执行成功return true;}private static boolean checkTransactionStatus(String orderId) {// 模拟查询本地事务执行结果,如查询订单ID是否已入库,查到则return truereturn true;}public static void main(String[] args) throws ClientException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 在控制台权限管理页面获取ak和skString accessKey = "your-ak";String secretKey = "your-sk";SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 在控制台获取并填写腾讯云提供的接入地址String endpoints = "https://your-endpoints";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String topic = "tran_topic";TransactionChecker checker = messageView -> {log.info("Receive transactional result check request, message={}", messageView);// 服务端主动回查本地事务状态String orderId = messageView.getProperties().get("orderId");boolean isSuccess = checkTransactionStatus(orderId);return isSuccess ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;};// 创建生产着并设置回查的checker对象Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration).setTopics(topic).setTransactionChecker(checker).build();// 开启事务final Transaction transaction = producer.beginTransaction();byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);String tag = "tagA";final Message message = provider.newMessageBuilder().setTopic(topic).setTag(tag).setKeys("your-key-565ef26f5727")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验.addProperty("orderId", "0001").setBody(body).build();// 发送半消息try {final SendReceipt sendReceipt = producer.send(message, transaction);log.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {log.error("Failed to send message", t);return;}// 执行本地事务boolean localTxSuccess = executeLocalTransaction();if (localTxSuccess) {// 本地事务执行成功,二次确认为Committransaction.commit();} else {// 本地事务执行失败,二次确认为Rollbacktransaction.rollback();}// producer.close();}}


private static boolean executeLocalTransaction() {// 本地事务执行失败return false;}private static boolean checkTransactionStatus(String orderId) {// 回查结果自然也是rollback,返回falsereturn false;}


文档反馈