tencent cloud

Feedback

Use of Spring Cloud Stream

Last updated: 2024-01-17 16:57:33

    Overview

    This document describes how to send and receive messages with the Spring Cloud Stream serving as example, for you to better understand the complete procedure involved in message sending and receiving.

    Prerequisites

    Directions:

    Step 1: Incorporating Dependencies

    Incorporate the spring-cloud-starter-stream-rocketmq dependency in the pom.xml file. The current recommended version is 2021.0.5.0, and it is necessary to exclude dependencies, using SDK 4.9.7.
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    <version>2021.0.5.0</version>
    <exclusions>
    <exclusion>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    </exclusion>
    <exclusion>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    </exclusion>
    </exclusions>
    </dependency>
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.7</version>
    </dependency>
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.9.7</version>
    </dependency>

    Step 2: Adding Configurations

    Add the corresponding RocketMQ configurations to the configuration file.
    spring:
    cloud:
    stream:
    rocketmq:
    binder:
    # Full name of the service address
    name-server: rmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080
    # Role name
    secret-key: admin
    # Role key
    access-key: eyJrZXlJZ...
    # producer group
    group: producerGroup
    bindings:
    # Channel name, corresponding to the channel name under spring.cloud.stream.bindings
    Topic-TAG1-Input:
    consumer:
    # Subscribed tag type, configured according to real consumer conditions (all messages are subscribed to by default)
    subscription: TAG1
    # Channel name
    Topic-TAG2-Input:
    consumer:
    subscription: TAG2
    bindings:
    # Channel name
    Topic-send-Output:
    # Specify topic, corresponding to the created topic name
    destination: TopicTest
    content-type: application/json
    # Channel name
    Topic-TAG1-Input:
    destination: TopicTest
    content-type: application/json
    group: consumer-group1
    # Channel name
    Topic-TAG2-Input:
    destination: TopicTest
    content-type: application/json
    group: consumer-group2
    Note:
    In terms of configuration, the subscription configuration item for 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2 is subscription, and the configuration item for other lower versions is tags.
    The complete configuration item reference for other versions is as follows:
    spring:
    cloud:
    stream:
    rocketmq:
    bindings:
    # Channel name, corresponding to the channel name under spring.cloud.stream.bindings
    Topic-test1:
    consumer:
    # Subscribed tag type, configured according to real consumer conditions (all messages are subscribed to by default)
    tags: TAG1
    # Channel name
    Topic-test2:
    consumer:
    tags: TAG2
    binder:
    # Full name of the service address
    name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080
    # Role name
    secret-key: admin
    # Role key
    access-key: eyJrZXlJZ...
    bindings:
    # Channel name
    Topic-send:
    # Specified topic
    destination: topic1
    content-type: application/json
    # Use the full name of the group
    group: group1
    # Channel name
    Topic-test1:
    destination: topic1
    content-type: application/json
    group: group1
    # Channel name
    Topic-test2:
    destination: topic1
    content-type: application/json
    group: group2
    Parameter
    Description
    name-server
    Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page on the console. Namespace access addresses in new version shared or exclusive clusters can be copied from the namespace list.
    secret-key
    Role name, which can be copied from SecretKey on the Cluster Permission page.
    access-key
    Role key, which can be copied from AccessKey on the Cluster Permission page.
    
    group
    Producer group name, which can be copied from the Group tab on the console.
    destination
    Topic name, which can be copied from the Topic tab on the console.

    Step 3: Configuring the Channel

    A channel consists of input and output. These can be individually configured as needed.
    /**
    * Custom channel binder
    */
    public interface CustomChannelBinder {
    
    /**
    * Send the message (message producer)
    * Bind the channel name specified in the configuration settings.
    */
    @Output("Topic-send-Output")
    MessageChannel sendChannel();
    
    
    /**
    * Receive Message 1 (Consumer 1)
    * Bind the channel name specified in the configuration settings.
    */
    @Input("Topic-TAG1-Input")
    MessageChannel testInputChannel1();
    
    /**
    * Receive Message 2 (Consumer 2)
    * Bind the channel name specified in the configuration settings.
    */
    @Input("Topic-TAG2-Input")
    MessageChannel testInputChannel2();
    }
    

    Step 4: Adding Annotations

    Add relevant annotations to the configuration or boot class. If there are multiple configured binder configurations, each must be specifically specified within these annotations.
    @EnableBinding({CustomChannelBinder.class})

    Step 5: Sending the Messages

    1. Inject CustomChannelBinder into the class of the message to be sent.
    @Autowired
    private CustomChannelBinder channelBinder;
    2. Send the messages by calling the corresponding output stream channel.
    Message<String> message = MessageBuilder.withPayload("This is a new message.").build();
    channelBinder.sendChannel().send(message);

    Step 6: Consuming the Messages

    @Service
    public class StreamConsumer {
    private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);
    
    /**
    * Monitor channel (designated by channel name in configuration)
    *
    * @param messageBody message content
    */
    @StreamListener("Topic-TAG1-Input")
    public void receive(String messageBody) {
    logger.info("Receive1: Message received via stream, messageBody = {}", messageBody);
    }
    
    /**
    * Monitor channel (designated by channel name in configuration)
    *
    * @param messageBody message content
    */
    @StreamListener("Topic-TAG2-Input")
    public void receive2(String messageBody) {
    logger.info("Receive2: Message received via stream, messageBody = {}", messageBody);
    }
    }

    Step 7: Local Test

    After the project is initiated locally, a successful startup notification will be displayed on the console.
    Visit http://localhost:8080/test-simple via a browser. You can see a successful transmission. Keep an eye on the output log of your development IDE.
    2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: Message sent via stream, messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]
    2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: Message received via stream, messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}
    
    You can see that a message with the TAG1 has been sent, and only the subscriber of TAG1 has received the message.
    Note:
    For specific usage, see the GitHub Demo or Spring Cloud Stream Official Website.
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support