Connecting Flume to CKafka

Last updated: 2021-08-10 15:42:04

    Apache Flume is a distributed, reliable, and highly available log collection system that supports a wide variety of data sources such as HTTP, log file, JMS, and listening port. It can efficiently collect, aggregate, move, and store massive amounts of log data to a specified storage system like Kafka, distributed file system, and Solr search server.

    The structure of Flume is as follows:

    Flume uses agents as the smallest independent unit of operation. An agent is a JVM composed of three main components: source, sink, and channel.

    Flume and Kafka

    When you store data in a downstream storage module or a computing module such as HDFS or HBase, a lot of complex factors need to be taken into account, such as the number of concurrent writes, system load, and network delay. As a flexible distributed system, Flume has various APIs and provides customizable pipelines.
    In the production process, Kafka can act as a cache when the production and consumption are at different paces. It has a partition structure and uses append to append data, which makes it have an excellent throughput. In addition, it has a replication structure, which makes it highly fault-tolerant.
    Therefore, Flume and Kafka together can satisfy most requirements in production environments.

    Flume Connection to Open-Source Kafka

    Preparations

    • Download Apache Flume (v1.6.0 or higher is compatible with Kafka).
    • Download Kafka (v0.9.x or higher is required as v0.8 is no longer supported).
    • Confirm that Kafka's source and sink components are already in Flume.

    Connection method

    Kafka can be used as source or sink to import or export messages.

    Configure Kafka as the message source, i.e., pulling data from Kafka into a specified sink as a consumer. The main configuration items are as follows:

    Configuration ItemDescription
    channelsConfigured channel
    typeThis must be org.apache.flume.source.kafka.KafkaSource
    kafka.bootstrap.serversKafka broker server address
    kafka.consumer.group.idID of the group as Kafka consumer
    kafka.topicsKafka topic as data source
    batchSizeSize of each write into channel
    batchDurationMillisMaximum time interval between writes

    Sample:

    tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource 
    tier1.sources.source1.channels = channel1
    tier1.sources.source1.batchSize = 5000
    tier1.sources.source1.batchDurationMillis = 2000
    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
    tier1.sources.source1.kafka.topics = test1, test2
    tier1.sources.source1.kafka.consumer.group.id = custom.g.id
    

    For more information, please visit Apache Flume's official website.

    Flume Connection to CKafka

    Step 1. Get the CKafka instance access address

    1. Log in to the CKafka console.
    2. Select Instance List on the left sidebar and click the ID of an instance to enter the instance basic information page.
    3. On the instance basic information page, get the instance access address in the Access Mode module.

    Step 2. Create a topic

    1. On the instance basic information page, select the Topic Management tab on the top.
    2. On the topic management page, click Create to create a topic named flume_test.

    Step 3. Configure Flume

    1. Download the Apache Flume toolkit and decompress it.

    2. Write the configuration file flume-kafka-sink.properties. Below is a simple demo (configured in the conf folder in the extracted directory). If there is no special requirement, simply replace your own instance IP and topic in the configuration file. The source used in this example is tail -F flume-test, which represents the information newly added in the file.

    3. Run the following command to start Flume:

      ./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-sink.properties
      

    4. Write messages to the flume-test file. At this time, the messages will be written by Flume to CKafka.

    5. Start the CKafka client for consumption.

      ./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:xxxx --topic flume_test --from-beginning --new-consumer
      

      Note

      Enter the access address of the CKafka instance just created as bootstrap-server and the name of the topic just created as topic.


      You can see that the messages have been consumed.