go get "github.com/rabbitmq/amqp091-go"
import (amqp "github.com/rabbitmq/amqp091-go")
// Required parameters.const (// Host: Select Cluster Details and go to the client access page to copy the access point.// For example, amqp://1.1.1.1:5672. You only need to enter the IP address.Host = "1.1.1.1"// UserName: username. It is required to create the user first in the console, or use the admin account on the cluster details - web console access address page.UserName = "test"// Password: user password. It is required to create the password first in the console.Password = "test"// Vhost: Enter the custom vhost. It is required to create the vhost first in the console.Vhost = "test")// Create a connection.conn, err := amqp.Dial("amqp://" + UserName + ":" + Password + "@" + Host + ":5672/" + Vhost)failOnError(err, "Failed to connect to RabbitMQ")defer func(conn *amqp.Connection) {err := conn.Close()if err != nil {}}(conn)// Establish a channel.ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer func(ch *amqp.Channel) {err := ch.Close()if err != nil {}}(ch)
Parameter | Description |
host | Cluster access address, which can be obtained from the Client Access module on the basic cluster information page. |
username | Username. Enter the username created in the console. |
password | User password. Enter the password specified during user creation in the console. |
vhost | Vhost name, which can be obtained from the vhost list in the console. |
// Declare an exchange (the name and type of it should be consistent with those of the existing exchanges).err = ch.ExchangeDeclare("logs-exchange", // Exchange name."fanout", // Exchange type.true, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a exchange")
// Message content.body := "this is new message."// Publish a message to an exchange.err = ch.Publish("logs-exchange", // exchange"", // Routing Key. (select whether the Routing Key is required based on the used exchange type). If no exchange is selected, this parameter is the message queue name.false, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")
// Publish a message to the specified message queue.err = ch.Publish("", // exchange"queue.Name", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")
// Create a consumer and consume messages in the specified message queue.msgs, err := ch.Consume("message-queue", // message-queue"", // consumerfalse, // Set to non-automatic acknowledgment (it can be selected as needed).false, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, "Failed to register a consumer")// Obtain messages in the message queue.forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)t := time.Duration(1)time.Sleep(t * time.Second)// Manually reply with acknowledgment.d.Ack(false)}}()log.Printf(" [Consumer] Waiting for messages.")<-forever
// It is required to specify the exchange and Routing Key in the message queue.err = ch.QueueBind("q.Name", // queue name"routing_key", // routing key"topic_demo", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")
Feedback