go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
{"topic": ["test"],"bootstrapServers": ["xx.xx.xx.xx:xxxx"],"consumerGroupId": "yourConsumerId"}
Parameter | Description |
topic | Topic name. Copy the name on the Topic List page in the console. |
bootstrapServers | Access network. On the Basic Info page of the instance in the console, select the Access Mode module and copy the network information from the Network column. |
consumerGroupId | You can define the name and see the consumer on the Consumer Group page after successful demo running. |
package mainimport ("fmt""gokafkademo/config""log""strings""github.com/confluentinc/confluent-kafka-go/kafka")func main() {cfg, err := config.ParseConfig("../config/kafka.json")if err != nil {log.Fatal(err)}p, err := kafka.NewProducer(&kafka.ConfigMap{// Set the access point. You can obtain the access point of the corresponding topic in the console."bootstrap.servers": strings.Join(cfg.Servers, ","),// The default value 1 is used if the configuration is not displayed. You can set it based on your business requirements."acks": 1,// The number of retries when a request error occurs. It is recommended to set this value to greater than 0 to ensure that the message is not lost to the maximum extent during failed retries."retries": 0,// The time between the failed request transmission and the next retry request."retry.backoff.ms": 100,// The timeout period for producer network requests."socket.timeout.ms": 6000,// Set the interval of internal retries on the client."reconnect.backoff.max.ms": 3000,})if err != nil {log.Fatal(err)}defer p.Close()// Send the generated message to the report handler.go func() {for e := range p.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Delivery failed: %v\\n", ev.TopicPartition)} else {fmt.Printf("Delivered message to %v\\n",ev.TopicPartition)}}}}()// Send messages asynchronously.topic := cfg.Topic[0]for _, word := range []string{"Confluent-Kafka", "Golang Client Message"} {_ = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: []byte(word),}, nil)}// Wait for messages to be sent.p.Flush(10 * 1000)}
go run main.go
Delivered message to test[0]@628Delivered message to test[0]@629
package mainimport ("fmt""gokafkademo/config""log""strings""github.com/confluentinc/confluent-kafka-go/kafka")func main() {cfg, err := config.ParseConfig("../config/kafka.json")if err != nil {log.Fatal(err)}c, err := kafka.NewConsumer(&kafka.ConfigMap{// Set the access point. You can obtain the access point of the corresponding topic in the console."bootstrap.servers": strings.Join(cfg.Servers, ","),// The message consumer group that has been set."group.id": cfg.ConsumerGroupId,"auto.offset.reset": "earliest",// Consumer timeout interval when the Kafka consumer group mechanism is used. If the broker does not receive the heartbeat from the consumer within this interval, the consumer is considered to be failed, and the broker// initiates the rebalancing process again. Currently, the value must be between the value (6000) of the broker configuration parameter group.min.session.timeout.ms and the value (300000) of group.max.session.timeout.ms."session.timeout.ms": 10000,})if err != nil {log.Fatal(err)}// List of subscribed message topics.err = c.SubscribeTopics(cfg.Topic, nil)if err != nil {log.Fatal(err)}for {msg, err := c.ReadMessage(-1)if err == nil {fmt.Printf("Message on %s: %s\\n", msg.TopicPartition, string(msg.Value))} else {// The client will automatically try to clear all errors.fmt.Printf("Consumer error: %v (%v)\\n", err, msg)}}c.Close()}
go run main.go
Message on test[0]@628: Confluent-KafkaMessage on test[0]@629: Golang Client Message
Feedback