Uploading Log via Kafka

Last updated: 2022-06-23 14:31:48

    CLS allows you to upload logs to CLS by using Kafka Producer SDKs or other Kafka related agents.

    Use Cases

    Using Kafka as a message pipeline is common in log applications. First, the open source collection client or the producer on the machine directly writes logs to be collected, and then provides them to the downstream, such as Spark and Flink, for consumption through the Kafka message pipeline. CLS has complete upstream and downstream capabilities of the Kafka message pipeline. The following describes the scenarios suitable for you to upload logs using the Kafka protocol. For more Kafka protocol consumption scenarios, see Kafka Real-Time Consumption.

    • Scenario 1: You already have a self-built system based on open source collection and you do not want complex secondary modifications. Then you can upload logs to CLS by modifying configuration files.
      For example, if you have set up a log system using ELK, now you only need to modify the Filebeat or Logstash configuration file to configure the output destination (see Filebeat configuration) to CLS to implement convenient and simple log upload to CLS.
    • Scenario 2: If you want to use Kafka producers to collect and upload logs, you do not need to install collection agents.
      CLS allows you to use various Kafka producer SDKs to collect logs and upload the logs to CLS via the Kafka protocol. For more information, see SDK call examples in this document.

    Use Limits

    • Supported Kafka protocol versions: 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X, 2.7.X, 2.8.X
    • Supported compression modes: Gzip, Snappy, LZ4
    • Current authentication mode: SASL_PLAINTEXT

    Configuration Methods

    To upload logs via Kafka, you need to set the following parameters:

    Parameter Description
    LinkType Currently, SASL_PLAINTEXT is supported.
    hosts Address of the initially connected cluster. For more information, see Service Entries.
    topic Log topic ID. Example: 76c63473-c496-466b-XXXX-XXXXXXXXXXXX
    username Logset ID. Example: 0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX
    password Password in the format of ${SecurityId}#${SecurityKey}. Example: XXXXXXXXXXXXXX#YYYYYYYY

    Service Entries

    RegionNetwork TypePort NumberService Entry
    GuangzhouPrivate network9095gz-producer.cls.tencentyun.com:9095
    Public network9096gz-producer.cls.tencentcs.com:9096

    This document uses the Guangzhou region as an example. The private and public domain names are identified by different ports. For other regions, replace the address prefixes. For more information, see here.


    Agent call examples

    Filebeat/Winlogbeat configuration

    enabled: true
    hosts: ["${region}-producer.cls.tencentyun.com:9096"] # TODO: service address. The public network port is 9096, and the private network port is 9095.
    topic: "${topicID}" #  TODO: topic ID
    version: ""
    compression: "${compress}"   # TODO: configuration compression mode
    username: "${logsetID}"
    password: "${SecurityId}#${SecurityKey}"

    Logstash example

    output {
    kafka {
      topic_id => "${topicID}"
      bootstrap_servers => "${region}-producer.cls.tencentyun.com:${port}"
      sasl_mechanism => "PLAIN"
      security_protocol => "SASL_PLAINTEXT"
      compression_type => "${compress}"
      sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${securityID}#${securityKEY};"

    SDK call examples

    Golang SDK call example

    import (
    func main() {
      config := sarama.NewConfig()
       config.Net.SASL.Mechanism = "PLAIN"
      config.Net.SASL.Version = int16(1)
      config.Net.SASL.Enable = true
      config.Net.SASL.User = "${logsetID}"                        // TODO: logset ID
      config.Net.SASL.Password = "${SecurityId}#${SecurityKey}"   // TODO: format ${SecurityId}#${SecurityKey}
      config.Producer.Return.Successes = true
      config.Producer.RequiredAcks = ${acks}                      // TODO: select the acks value according to the use case
      config.Version = sarama.V0_11_0_0
      config.Producer.Compression = ${compress}                   // TODO: configuration compression mode
       // TODO: Service address. The public network port is 9096, and the private network port is 9095.
      producer, err := sarama.NewSyncProducer([]string{"${region}-producer.cls.tencentyun.com:9096"}, config)
      if err != nil {
       msg := &sarama.ProducerMessage{
          Topic: "${topicID}", // TODO: topic ID
          Value: sarama.StringEncoder("goland sdk sender demo"),
      // Send the message
      for i := 0; i <= 5; i++ {
          partition, offset, err := producer.SendMessage(msg)
          if err != nil {
          fmt.Printf("send response; partition:%d, offset:%d\n", partition, offset)
       _ = producer.Close()

    Python SDK call example

    from kafka import KafkaProducer
    if __name__ == '__main__':
      produce = KafkaProducer(
          # TODO: Service address. The public network port is 9096, and the private network port is 9095.
          # TODO: Logset ID
          # TODO: The format is ${SecurityId}#${SecurityKey}
          api_version=(0, 11, 0),
          # TODO: Configuration compression mode
       for i in range(0, 5):
          # TODO: Topic ID of the sent message
          future = produce.send(topic="${topicID}", value=b'python sdk sender demo')
          result = future.get(timeout=10)

    Java SDK call example

    Maven dependencies:


    Sample code:

    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    public class ProducerDemo {
       public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
           // 0. Set parameters
           Properties props = new Properties();
           // TODO: In use
           props.put("bootstrap.servers", "${region}-producer.cls.tencentyun.com:9096");
           // TODO: Set the following according to the actual business scenario 
           props.put("acks", ${acks});
           props.put("retries", ${retries});
           props.put("batch.size", ${batch.size});
           props.put("linger.ms", ${linger.ms});
           props.put("buffer.memory", ${buffer.memory});
           props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "${compress_type}"); // TODO: configuration compression mode
           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("security.protocol", "SASL_PLAINTEXT");
           props.put("sasl.mechanism", "PLAIN");
           // TODO: The username is logsetID, and the password is the combination of securityID and securityKEY: securityID#securityKEY.
                   "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecurityId}#${SecurityKey}';");
            // 1. Create a producer object.
           Producer<String, String> producer = new KafkaProducer<String, String>(props);
           // 2. Call the send method.
           Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicID}", ${message}));
           RecordMetadata recordMetadata = meta.get(${timeout}, TimeUnit.MILLISECONDS);
           System.out.println("offset = " + recordMetadata.offset());
            // 3. Close the producer.

    SDK for C call example

    // https://github.com/edenhill/librdkafka - master
    #include <iostream>
    #include <librdkafka/rdkafka.h>
    #include <string>
    #include <unistd.h>
    #define BOOTSTRAP_SERVER "${region}-producer.cls.tencentyun.com:${port}"
    #define USERNAME "${logsetID}"
    #define PASSWORD "${SecurityId}#${SecurityKey}"
    #define TOPIC "${topicID}"
    #define ACKS "${acks}"
    #define COMPRESS_TYPE "${compress_type}"
    static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
      if (rkmessage->err) {
          fprintf(stdout, "%% Message delivery failed : %s\n", rd_kafka_err2str(rkmessage->err));
      } else {
          fprintf(stdout, "%% Message delivery successful %zu:%d\n", rkmessage->len, rkmessage->partition);
    int main(int argc, char **argv) {
      // 1. Initialize the configuration.
      rd_kafka_conf_t *conf = rd_kafka_conf_new();
       rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
       char errstr[512];
      if (rd_kafka_conf_set(conf, "bootstrap.servers", BOOTSTRAP_SERVER, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
       if (rd_kafka_conf_set(conf, "acks", ACKS, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
       if (rd_kafka_conf_set(conf, "compression.codec", COMPRESS_TYPE, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
       // Set the authentication method.
      if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
      if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
      if (rd_kafka_conf_set(conf, "sasl.username", USERNAME, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
      if (rd_kafka_conf_set(conf, "sasl.password", PASSWORD, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
          fprintf(stdout, "%s\n", errstr);
          return -1;
       // 2. Create a handler.
      rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
      if (!rk) {
          fprintf(stdout, "create produce handler failed: %s\n", errstr);
          return -1;
       // 3. Send data.
      std::string value = "test lib kafka ---- ";
      for (int i = 0; i < 100; ++i) {
          rd_kafka_resp_err_t err = rd_kafka_producev(
                  rk, RD_KAFKA_V_TOPIC(TOPIC),
                  RD_KAFKA_V_VALUE((void *) value.c_str(), value.size()),
                  RD_KAFKA_V_OPAQUE(nullptr), RD_KAFKA_V_END);
           if (err) {
              fprintf(stdout, "Failed to produce to topic : %s, error : %s", TOPIC, rd_kafka_err2str(err));
              if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                  rd_kafka_poll(rk, 1000);
                  goto retry;
          } else {
              fprintf(stdout, "send message to topic successful : %s\n", TOPIC);
           rd_kafka_poll(rk, 0);
       std::cout << "message flush final" << std::endl;
      rd_kafka_flush(rk, 10 * 1000);
       if (rd_kafka_outq_len(rk) > 0) {
          fprintf(stdout, "%d message were not deliverer\n", rd_kafka_outq_len(rk));
       return 0;
