Connecting Logstash to CKafka

Last updated: 2021-08-13 18:09:04

    Logstash, an open-source log processing tool, is used to collect data from multiple sources, filter collected data, and store data for other uses.

    Logstash is highly flexible and has powerful syntax analysis capabilities. With a variety of plugins, it supports multiple types of inputs and outputs. In addition, as a horizontally scalable data pipeline, it has powerful log collection and retrieval features that work with Elasticsearch and Kibana.

    How Logstash Works

    The Logstash data processing pipeline can be divided into 3 stages: inputs → filters → outputs.

    1. Inputs: generate data. Inputs are data sources such as file, syslog, redis, and beats.
    2. Filters: modify and filter data. Filters are intermediate processing components in the Logstash data pipeline. They can modify events based on specific conditions. Some commonly used filters are grok, mutate, drop, and clone.
    3. Outputs: transfer data to other locations. An event can be transferred to multiple outputs, and the event ends when the transfer is completed. Elasticsearch is the most commonly used output.

    In addition, Logstash supports encoding and decoding data, so you can specify data formats on the input and output ends.

    Advantages of Connecting Logstash to Kafka

    • Data can be asynchronously processed to prevent traffic spikes.
    • Components are decoupled, so when an exception occurs in Elasticsearch, the upstream work will not be affected.
    Note:

    Logstash consumes resources when processing data. If you deploy Logstash on a production server, the performance of the server may be affected.

    Directions

    Preparations

    Step 1. Get the CKafka instance access address

    1. Log in to the CKafka console.
    2. Select Instance List on the left sidebar and click the ID of an instance to enter the instance basic information page.
    3. On the instance basic information page, get the instance access address in the Access Mode module.

    Step 2. Create a topic

    1. On the instance basic information page, select the Topic Management tab on the top.
    2. On the topic management page, click Create to create a topic named logstash_test.

    Step 3. Connect to CKafka

    Note:

    You can click the following tabs to view the detailed directions for using CKafka as inputs or outputs.

    1. Run bin/logstash-plugin list to check whether logstash-input-kafka is included in the supported plugins.

    2. Write the configuration file input.conf in the .bin/ directory.
      In the following example, Kafka is used as the data source, and the standard output is taken as the data destination.

      input {
         kafka {
             bootstrap_servers => "xx.xx.xx.xx:xxxx" // CKafka instance access address
             group_id => "logstash_group"  // CKafka group ID
             topics => ["logstash_test"] // CKafka topic name
             consumer_threads => 3 // Number of consumer threads, which is generally the same as the number of CKafka partitions
             auto_offset_reset => "earliest"
         }
      }
      output {
         stdout{codec=>rubydebug}
      }
      

    3. Run the following command to start Logstash and consume messages.

      ./logstash -f input.conf
      

      The returned result is as follows:

      You can see that the data in the topic above has been consumed now.