tencent cloud

Feedback

Confluent Go SDK

Last updated: 2024-05-13 15:48:24

    Overview

    TDMQ for CKafka is a distributed stream processing platform designed for building real-time data pipelines and streaming applications. It boasts high throughput, low latency, scalability, and fault tolerance.
    Sarama: A Kafka library developed by Shopify, providing features such as producers, consumers, and partition consumers. The library performs well, and has an active community support.
    Confluent-Kafka-Go: A Kafka library developed by Confluent, providing high-level APIs that are easy to use. Based on the librdkafka C library, its performance is excellent, though it can be somewhat complex to install and use.
    This document describes the key parameters, best practices, and FAQs of the Confluent Go client mentioned above.

    Producer Practice

    Version Selection

    When the Confluent Go SDK is used, you can specify the address of the Kafka cluster through the configuration parameter bootstrap.servers, while the version of the Broker can be set through the api.version.request parameter, enabling the Confluent Go SDK to automatically detect the Broker's version at startup.
    config := &kafka.ConfigMap{
    "bootstrap.servers": "localhost",
    "api.version.request": true,
    }

    Producer Parameters and Optimization

    Producer Parameters

    Confluent Go is developed based on librdkafka. When writing to Kafka using the Confluent Go Client, the configuration parameters passed to librdkafka involve the following key parameters, with the relevant parameters and default values as follows:
    
    package main
    
    import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    config := &kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "acks": -1, //ack mode, with the default value being -1.
    "client.id": "rdkafka", //Client ID.
    "compression.type": "none", // Specify compression type.
    "compression.level": -1, //Compression level.
    "batch.num.messages": 10000, // By default, a batch can buffer up to 10,000 messages to form a MessageSet for batch sending, enhancing performance.
    "batch.size": 1000000, //The limit for the total size of a MessageSet batch, by default not exceeding 1,000,000 bytes.
    "queue.buffering.max.ms": 5, //The delay before transmitting message batches (MessageSets) to the Broker to buffer messages, 5 ms by default.
    "queue.buffering.max.messages": 100000, //The total number of messages buffered by the Producer should not exceed 100,000.
    "queue.buffering.max.kbytes": 1048576, //MessageSets for the Producer buffering messages.
    "message.send.max.retries": 2147483647, //Number of retries, 2,147,483,647 by default.
    "retry.backoff.ms": 100, //Retry interval, with the default value being 100 ms.
    "socket.timeout.ms": 60000, //Session timeout period, with the default value being 60 s.
    }
    
    producer, err := kafka.NewProducer(config)
    if err != nil {
    panic(fmt.Sprintf("Failed to create producer: %s", err))
    }
    
    // Use producer to send messages and perform other operations...
    
    // Close producer
    producer.Close()
    }

    Parameter Description and Optimization

    acks Parameter Optimization
    The acks parameter is used to control the confirmation mechanism when the producer sends messages. The default value of this parameter is -1, which means that after the message is sent to the leader broker, it is not returned until the Leader confirmation and the corresponding follower messages are all written. The acks parameter can be set to values of 0, 1, or -1. In cross-availability zone scenarios, and for topics with a higher number of replicas, the value of the acks parameter will affect the message's reliability and throughput.
    In some online business message scenarios, where the throughput requirements are not high, you can set the acks parameter to -1 to ensure that the message is received and confirmed by all replicas before returning, thereby improving the message's reliability.
    In scenarios involve big data, such as log collection, or offline computing, where high throughput (i.e., the volume of data written to Kafka per second) is required, you can set the acks to 1 to increase throughput.
    buffering Parameter Optimization (Caching)
    By default, for transmitting the same volume of data, a single request's network transmission can effectively reduce related computation and network resources compared to multiple requests, thereby improving the overall write throughput.
    Therefore, you can set parameter to optimize the client's message sending throughput. For Confluent kafka Go, a default batching time of 5 ms is provided to buffer messages. If the message is small, you can increase the queue.buffering.max.ms time to an appropriate value.
    Compression Parameter Optimization
    Confluent Go supports the following compression parameters: none, gzip, snappy, lz4, and zstd.
    In the Confluent Kafka Go client, the following compression algorithms are supported:
    none: No compression algorithm.
    gzip: Compress by GZIP algorithm.
    snappy: Compress by Snappy algorithm.
    lz4: Compress by LZ4 algorithm.
    zstd: Compress by ZSTD algorithm.
    To use a compression algorithm in the producer client, you need to set the compression.type parameter when creating the producer. For example, you can set compression.type to lz4 to compress by LZ4 algorithm. The compression algorithm's CPU compression and CPU decompression occur on the client side, which is a way of optimizing by trading computing power for bandwidth. However, the broker incurs extra computation costs for validating compressed messages, especially for Gzip compression, resulting in significant server's computation costs. The increased computation can lead to lower message processing capabilities for the broker and lower bandwidth throughput as a result, which may not be worth it in some cases. In such cases, the following approach is recommended:
    1. Independently compress message data on the producer side to generate packed data compression: messageCompression, and store the compression method in the message's key:
    {"Compression","CompressionLZ4"}
    2. On the producer side, send messageCompression as a normal message.
    3. On the consumer side, read the message key to access the compression method used, and perform independent decompression.

    Creating Producer Instance

    If your application requires higher throughput, you can use asynchronous producer to increase the speed of message sending and utilize batch message sending to reduce network overhead and IO consumption. If your application requires higher reliability, you can use synchronous producer to ensure successful message delivery. Additionally, the ACK acknowledgement mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter optimization, refer to Producer Parameters and Optimization.
    
    package main
    
    import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    // Configure Kafka Producer.
    p, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "acks": "1",
    "compression.type": "none",
    "batch.num.messages": "1000",
    })
    if err != nil {
    fmt.Printf("Failed to create producer: %s\\n", err)
    return
    }
    
    // Send message.
    for i := 0; i < 10; i++ {
    topic := "test-topic"
    value := fmt.Sprintf("hello world %d", i)
    message := &kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value: []byte(value),
    }
    p.Produce(message, nil)
    }
    
    // Close Kafka Producer.
    p.Flush(15 * 1000)
    p.Close()
    }

    Consumer Practice

    Consumer Parameters and Optimization

    Consumer Parameters

    
    package main
    
    import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    // Configure Kafka Consumer.
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "group.id": "test-group",
    "auto.offset.reset":"earliest",
    "fetch.min.bytes":1, // Minimum pulling byte count.
    "fetch.max.bytes":52428800, // Maximum pulling byte count.
    "fetch.wait.max.ms":"500", //If there are no new messages to consume, wait 500 ms by default.
    "enable.auto.commit":true, //Enable automatic offset commit, with the default value being true.
    "auto.commit.interval.ms":5000, //Interval between automatic offset commit, with the default value being 5 s.
    "max.poll.interval.ms": 300000, // Maximum delay between two poll operations for the Consumer, with the default value being 5 minutes.
    "session.timeout.ms": 45000, //Session time, with the default value being 45 s.
    "heartbeat.interval.ms": 3000, //Heartbeat time, with the default value being 3 s.
    })
    if err != nil {
    fmt.Printf("Failed to create consumer: %s\\n", err)
    return
    }
    
    // Subscribe to topics.
    c.SubscribeTopics([]string{"test-topic"}, nil)
    
    // Manually commit offset.
    for {
    ev := c.Poll(100)
    if ev == nil {
    continue
    }
    
    switch e := ev.(type) {
    case *kafka.Message:
    fmt.Printf("Received message: %s\\n", string(e.Value))
    c.CommitMessage(e)
    case kafka.Error:
    fmt.Printf("Error: %v\\n", e)
    }
    }
    
    // Close Kafka consumer.
    c.Close()
    }

    Parameter Description and Optimization

    1. max.poll.interval.ms is a configuration parameter for Kafka consumer. It specifies the maximum delay between two poll operations by the consumer. Its primary function is to control the consumer's liveness, i.e., to determine whether the consumer is still active. If the consumer does not perform a poll operation within the time specified by max.poll.interval.ms, then Kafka considers this consumer to have failed and triggers a rebalance for the consumer. Adjust this parameter according to the actual consumption speed. If it is too low, the consumer may frequently trigger rebalances, increasing the burden on Kafka; if it is too high, Kafka may not be able to promptly detect issues with the consumer, thereby affecting message consumption.
    2. For general consumption, issues mainly involves frequent rebalancing and consumption thread blocking. For parameter optimization, refer to the following method:
    2.1 session.timeout.ms: For versions before v0.10.2, increase this parameter value appropriately, make it greater than the time it takes to consume a batch of data and not exceed 30 s. The recommended value is 25 s; for v0.10.2 and later versions, use the default value of 10 s.
    2.2 heartbeat.interval.ms: Default value is 3 s. This value should be less than session.timeout.ms / 3.
    2.3 max.poll.interval.ms: Default value is 5 minutes. If there are many partitions and consumers, it is recommended to appropriately increase this value. It should be greater than <max.poll.records> / (<number of messages consumed per second per thread> x <number of consumption threads>).
    Note:
    If you want to process messages synchronously, that is, pull a message, process it, and then pull the next one, you need to make the following modifications:
    Increase the MaxProcessingTime according to your needs.
    Monitor for processing times that exceed the MaxProcessingTime, sample and print timeout durations.
    3. For automatic offset commit requests, it's recommended not to set auto.commit.interval.ms below 1,000 ms, as too frequent offset requests can cause high broker CPU usage, affecting the read and write operations of other normal services.

    Creating Consumer Instance

    Confluent Go provides a subscription model to create consumers, and includes manual-commit offset and auto-commit offset as two offset commit methods.

    Auto-Commit Offsets

    Auto-commit offsets: After pulling messages, the consumer automatically commit offsets without manual intervention. The advantage of this method is it's easy to use, but it may lead to duplicate message consumption or loss.
    
    package main
    
    import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    // Configure Kafka consumer
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "group.id": "test-group",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": true, //Whether to enable auto-commit offset. True means yes.
    "auto.commit.interval.ms": 5000, //Interval for auto-commit offsets. Value of 5000 milliseconds (i.e., 5 seconds) means the offset is automatically committed every 5 seconds.
    "max.poll.interval.ms": 300000, //Maximum wait time for the consumer in a single poll operation. Value of 300,000 milliseconds (i.e., 5 minutes) means the consumer waits up to 5 minutes in a single poll operation.
    "session.timeout.ms": 10000, //Specify the session timeout period between the Consumer and the broker. Value is set to 10 seconds.
    "heartbeat.interval.ms": 3000, //Specify the interval for the consumer to send heartbeat messages. Value is set to 3,000 milliseconds (i.e., 3 seconds).
    })
    if err != nil {
    fmt.Printf("Failed to create consumer: %s\\n", err)
    return
    }
    
    // Subscribe to topics.
    c.SubscribeTopics([]string{"test-topic"}, nil)
    
    // Automatically commit offset.
    for {
    ev := c.Poll(100)
    if ev == nil {
    continue
    }
    
    switch e := ev.(type) {
    case *kafka.Message:
    fmt.Printf("Received message: %s\\n", string(e.Value))
    case kafka.Error:
    fmt.Printf("Error: %v\\n", e)
    }
    }
    
    // Close Kafka Consumer.
    c.Close()

    Manual-Commit Offsets

    Manual-commit offsets: After processing messages, consumers need to manually commit offsets. The advantage of this method is that it allows for precise control over offset commit, avoiding duplicate message consumption or loss. It should be noted that manual-commit offset can lead to high broker CPU usage, affecting performance. As message volume increases, CPU consumption will be significantly high, affecting other features of the broker. Therefore, it is recommended to commit an offset after a certain number of messages.
    
    package main
    
    import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    // Configure Kafka Consumer
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "group.id": "test-group",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": false,
    "max.poll.interval.ms": 300000,
    "session.timeout.ms": 10000,
    "heartbeat.interval.ms": 3000,
    })
    if err != nil {
    fmt.Printf("Failed to create consumer: %s\\n", err)
    return
    }
    
    // Subscribe to topics.
    c.SubscribeTopics([]string{"test-topic"}, nil)
    
    // Manually commit offsets.
    for {
    ev := c.Poll(100)
    if ev == nil {
    continue
    }
    
    switch e := ev.(type) {
    case *kafka.Message:
    fmt.Printf("Received message: %s\\n", string(e.Value))
    c.CommitMessage(e)
    case kafka.Error:
    fmt.Printf("Error: %v\\n", e)
    }
    }
    
    // Close Kafka Consumer.
    c.Close()
    
    
    
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support