tencent cloud

Feedback

SDK for Go

Last updated: 2023-09-12 17:53:17

    Overview

    This document describes how to use open-source SDK to send and receive messages by using the SDK for Go as an example and helps you better understand the message sending and receiving processes.

    Prerequisites

    You have created the required resources as instructed in Resource Creation and Preparation.
    You have installed Go.

    Directions

    1. Run the following command in the client environment to RocketMQ client dependencies.
    go get github.com/apache/rocketmq-client-go/v2
    2. Create a producer in the corresponding method. If you need to send general messages, modify the corresponding parameters in the syncSendMessage.go file.
    Delayed messages currently support delays of arbitrary precision without being subject to the delay level.
    General Message
    Delayed message
    // Service access address (Note: Add “http://” or “https://” before the access address; otherwise, it cannot be resolved)
    var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
    // Authorize the role name
    var secretKey = "admin"
    // Authorize the role token
    var accessKey = "eyJrZXlJZC...."
    // Full namespace name
    var nameSpace = "MQ_INST_rocketmqem4xxxx"
    // Producer group name
    var groupName = "group1"
    // Create a message producer
    p, _ := rocketmq.NewProducer(
    // Set the service address
    producer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
    // Set ACL permissions
    producer.WithCredentials(primitive.Credentials{
    SecretKey: secretKey,
    AccessKey: accessKey,
    }),
    // Set the producer group
    producer.WithGroupName(groupName),
    // Set the namespace name
    producer.WithNamespace(nameSpace),
    // Set the number of retries upon sending failures
    producer.WithRetry(2),
    )
    // Start the producer
    err := p.Start()
    if err != nil {
    fmt.Printf("start producer error: %s", err.Error())
    os.Exit(1)
    }
    
    // Topic name
    var topicName = "topic1"
    // Producer group name
    var groupName = "group1"
    // Create a message producer
    p, _ := rocketmq.NewProducer(
    // Set the service address
    producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"})),
    // Set ACL permissions
    producer.WithCredentials(primitive.Credentials{
    SecretKey: "admin",
    AccessKey: "eyJrZXlJZC......",
    }),
    // Set the producer group
    producer.WithGroupName(groupName),
    // Set the namespace name
    producer.WithNamespace("rocketmq-xxx|namespace_go"),
    // Set the number of retries upon sending failures
    producer.WithRetry(2),
    )
    // Start the producer
    err := p.Start()
    if err != nil {
    fmt.Printf("start producer error: %s", err.Error())
    os.Exit(1)
    }
    for i := 0; i < 1; i++ {
    msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))
    // Set delay level
    // The relationship between the delay level and the delay time:
    // 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h;
    // 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
    // If you want to use the delay level, then set the following method:
    
    msg.WithDelayTimeLevel(3)
    // If you want to use any delayed message, then set the following method without setting `WithDelayTimeLevel`. The unit is milliseconds. The following shows that a delayed message is delivered after 10 seconds.
    delayMills := int64(10 * 1000)
    msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))
    // Send the message
    res, err := p.SendSync(context.Background(), msg)
    if err != nil {
    fmt.Printf("send message error: %s\\n", err)
    } else {
    fmt.Printf("send message success: result=%s\\n", res.String())
    }
    }
    
    // Release resources
    err = p.Shutdown()
    if err != nil {
    fmt.Printf("shutdown producer error: %s", err.Error())
    }
    Parameter
    Description
    secretKey
    Role name, which can be copied on the Role Management page.
    accessKey
    Role token, which can be copied in the Token column on the Role Management page.
    
    nameSpace
    Namespace name, which can be copied on the Namespace page in the console.
    serverAddress
    Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved.
    
    groupName
    Producer group name, which can be copied under the Group tab in the console.
    3. The process of sending messages (using sync sending as an example) is the same as above.
    // Topic name
    var topicName = "topic1"
    // Configure message content
    msg := &primitive.Message{
    Topic: topicName, // Set the topic name
    Body: []byte("Hello RocketMQ Go Client! This is a new message."),
    }
    // Set tags
    msg.WithTag("TAG")
    // Set keys
    msg.WithKeys([]string{"yourKey"})
    // Send the message
    res, err := p.SendSync(context.Background(), msg)
    if err != nil {
    fmt.Printf("send message error: %s\\n", err)
    } else {
    fmt.Printf("send message success: result=%s\\n", res.String())
    }
    Parameter
    Description
    topicName
    Topic name, which can be copied under the Topic tab on the cluster details page in the console.
    TAG
    Message tag identifier
    yourKey
    Message business key
    Release the resource.
    // Disable the producer
    err = p.Shutdown()
    if err != nil {
    fmt.Printf("shutdown producer error: %s", err.Error())
    }
    Note
    For more information on async sending and one-way sending, see Demo or RocketMQ-Client-Go Example.
    4. Create a consumer.
    // Service access address (Note: Add “http://” or “https://” before the access address; otherwise, it cannot be resolved)
    var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
    // Authorize the role name
    var secretKey = "admin"
    // Authorize the role token
    var accessKey = "eyJrZXlJZC...."
    // Full namespace name
    var nameSpace = "rocketmq-xxx|namespace_go"
    // Producer group name
    var groupName = "group11"
    // Create a consumer
    c, err := rocketmq.NewPushConsumer(
    // Set the consumer group
    consumer.WithGroupName(groupName),
    // Set the service address
    consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
    // Set ACL permissions
    consumer.WithCredentials(primitive.Credentials{
    SecretKey: secretKey,
    AccessKey: accessKey,
    }),
    // Set the namespace name
    consumer.WithNamespace(nameSpace),
    // Set consumption from the start offset
    consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
    // Set the consumption mode (cluster consumption by default)
    consumer.WithConsumerModel(consumer.Clustering),
    //For broadcasting consumption, set the instance name to the system name of the application. If the instance name is not set, the pid will be used, which will cause a restart for repeated consumption
    consumer.WithInstance("xxxx"),
    )
    if err != nil {
    fmt.Println("init consumer2 error: " + err.Error())
    os.Exit(0)
    }
    Parameter
    Description
    secretKey
    Role name, which can be copied on the Role Management page.
    accessKey
    Role token, which can be copied in the Token column on the Role Management page.
    
    nameSpace
    The full namespace name can be copied under the Topic tab on the Cluster page in the console, which is in the format of cluster ID + | + namespace.
    serverAddress
    Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved.
    
    groupName
    Producer group name, which can be copied under the Group tab in the console.
    5. Consume a message.
    // Topic name
    var topicName = "topic1"
    // Set the tag of messages that are subscribed to
    selector := consumer.MessageSelector{
    Type: consumer.TAG,
    Expression: "TagA || TagC",
    }
    // Set the delay level of consumption retry. A total of 18 levels can be set. Below is the relationship between each delay level and the delay time.
    // 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
    // 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
    delayLevel := 1
    err = c.Subscribe(topicName, selector, func(ctx context.Context,
    msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    fmt.Printf("subscribe callback len: %d \\n", len(msgs))
    // Set the delay level for the next consumption
    concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
    concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLater
    for _, msg := range msgs {
    // Simulate a successful consumption after three retries
    if msg.ReconsumeTimes > 3 {
    fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)
    return consumer.ConsumeSuccess, nil
    } else {
    fmt.Printf("subscribe callback: %v \\n", msg)
    }
    }
    // Simulate a consumption failure. Retry is required.
    return consumer.ConsumeRetryLater, nil
    })
    if err != nil {
    fmt.Println(err.Error())
    }
    Parameter
    Description
    topicName
    Topic name, which can be copied on the Topic page in the console.
    Expression
    Message tag identifier
    delayLevel
    A parameter used to set the delay level of consumption retry. A total of 18 delay levels are supported.
    6. Consume messages (the consumer can consume messages only after the messages are subscribed to).
    // Start consumption
    err = c.Start()
    if err != nil {
    fmt.Println(err.Error())
    os.Exit(-1)
    }
    time.Sleep(time.Hour)
    // Release resources
    err = c.Shutdown()
    if err != nil {
    fmt.Printf("shundown Consumer error: %s", err.Error())
    }
    7. View consumption details. Log in to the TDMQ console, go to the Cluster > Group page, and view the list of clients connected to the group. Click View Details in the Operation column to view consumer details.
    
    
    Note
    Above is a brief introduction to how to send and receive messages with the Go client. For more information, see Demo or Rocketmq-Client-Go Example.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support