新功能发布记录
公告
go get "github.com/rabbitmq/amqp091-go"
import (amqp "github.com/rabbitmq/amqp091-go")
// 所需参数const (// Host 集群详情-客户端接入页面,复制该接入点// 比如amqp://1.1.1.1:5672,这里只需要填写ip即可Host = "1.1.1.1"// UserName 用户名称, 需要先在控制台上创建该用户,也可以使用集群详情-web控制台访问地址页面,admin账号UserName = "test"// Password 用户密钥, 需要先在控制台上创建该密码Password = "test"// Vhost 这里填写自定义的vhost,需要先在控制台上创建该vhostVhost = "test")// 创建连接conn, err := amqp.Dial("amqp://" + UserName + ":" + Password + "@" + Host + ":5672/" + Vhost)failOnError(err, "Failed to connect to RabbitMQ")defer func(conn *amqp.Connection) {err := conn.Close()if err != nil {}}(conn)// 建立通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer func(ch *amqp.Channel) {err := ch.Close()if err != nil {}}(ch)
参数 | 说明 |
host | 集群接入地址,在集群基本信息页面的客户端接入模块获取。 ![]() |
username | 用户名称,填写在控制台创建的用户名称。 |
password | 用户密码,填写在控制台创建用户时填写的密码。 |
vhost | Vhost 名称,在控制台 Vhost 列表获取。 |
// 声明交换机 (名称和类型需要与存在的交换机保持一致)err = ch.ExchangeDeclare("logs-exchange", // 交换机名称"fanout", // 交换机类型true, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a exchange")
// 消息内容body := "this is new message."// 发布消息到交换机err = ch.Publish("logs-exchange", // exchange"", // routing key (根据使用的交换机类型可选择的是否需要routing key),如果不选择交换机,该参数为消息队列名称false, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")
// 发布消息到指定的消息队列err = ch.Publish("", // exchange"queue.Name", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")
// 创建消费者并消费指定消息队列中的消息msgs, err := ch.Consume("message-queue", // message-queue"", // consumerfalse, // 设置为非自动确认(可根据需求自己选择)false, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, "Failed to register a consumer")// 获取消息队列中的消息forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)t := time.Duration(1)time.Sleep(t * time.Second)// 手动回复ackd.Ack(false)}}()log.Printf(" [Consumer] Waiting for messages.")<-forever
// 需要在消息队列中指定 交换机 和 routing keyerr = ch.QueueBind("q.Name", // queue name"routing_key", // routing key"topic_demo", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")
文档反馈