Flume Connection to CKafka

Last updated: 2020-04-27 18:08:07

    Apache Flume Overview

    Apache Flume is a distributed, reliable, and highly available log collection system that supports a wide variety of data sources such as HTTP, log file, JMS, and listening port. It can efficiently collect, aggregate, move, and store massive amounts of log data to a specified storage system like Kafka, distributed file system, and Solr search server.

    // The structure of Flume is as follows:

    Flume uses agents as the smallest independent unit of operation. An agent is a JVM composed of three main components: source, sink, and channel.

    Flume and Kafka

    When you store data in a downstream storage module or a computing module such as HDFS or HBase, a lot of complex factors need to be taken into account, such as the number of concurrent writes, system load, and network delay. As a flexible distributed system, Flume has various APIs and provides customizable pipelines.
    In the production process, Kafka can act as a cache when the production and consumption are at different paces. It has a partition structure and uses append to append data, which makes it have an excellent throughput. In addition, it has a replication structure, which makes it highly fault-tolerant.
    Therefore, Flume and Kafka together can satisfy most requirements in production environments.

    How to Connect to Open-Source Kafka

    Preparations

    • Download Apache Flume (v1.6.0 or higher is compatible with Kafka).
    • Download Kafka (v0.9.x or higher is required as v0.8 is no longer supported).
    • Confirm that Kafka's source and sink components are already in Flume.

    Connection method

    Kafka can be used as source or sink to import or export messages.

    Kafka source
    Configure Kafka as the message source, i.e., pulling data from Kafka into a specified sink as a consumer. The main configuration items are as follows:

    Configuration Item Description
    channels Configured channel
    type This must be org.apache.flume.source.kafka.KafkaSource
    kafka.bootstrap.servers Kafka broker server
    kafka.consumer.group.id ID of the group as Kafka consumer
    kafka.topics Kafka topic as data source
    batchSize Size of each write into channel
    batchDurationMillis Maximum time interval between writes

    Sample:

    tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource 
    tier1.sources.source1.channels = channel1
    tier1.sources.source1.batchSize = 5000
    tier1.sources.source1.batchDurationMillis = 2000
    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
    tier1.sources.source1.kafka.topics = test1, test2
    tier1.sources.source1.kafka.consumer.group.id = custom.g.id

    Kafka sink
    Configure Kafka as the message receiver, i.e., pushing data into the Kafka server as a producer for subsequent operations. The main configuration items are as follows:

    Configuration Item Description
    channel Configured channel
    type This must be org.apache.flume.sink.kafka.KafkaSink
    kafka.bootstrap.servers Kafka broker server
    kafka.topics Kafka topic as data source
    kafka.flumeBatchSize Size of each written batch
    kafka.producer.acks Production policy of Kafka producer

    Sample:

    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = mytopic
    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1

    For more information, please visit Apache Flume's official website.

    Flume Connection to CKafka

    Preparations

    CKafka configuration

    1. In the CKafka Console, click the instance name to view the specific information assigned to the instance.

      The private IP and port in the figure serve as the subsequent server IP.

    2. Click Topic Management > Create to create a topic named flume_test.

    Flume configuration

    1. Decompress the downloaded Apache Flume package

    2. Configure Flume options

    Use CKafka as a sink

    1. Write a configuration file.
      The combination of Flume and CKafka as a sink is focused on here, while the source and channel use the default configuration. Below is a simple demo (configured in the conf folder in the extracted directory). If there is no special requirement, simply replace your own instance IP and topic in the configuration file. The source used in this example is tail -F flume-test, which represents the information newly added in the file.
    2. Start Flume.
      ./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-sink.properties
    3. Write messages to the flume-test file. At this time, the messages will be written by Flume to CKafka.
    4. Start the CKafka client for consumption.
      ./kafka-console-consumer.sh --bootstrap-server 172.16.16.12:9092 --topic flume_test --from-beginning --new-consumer
      You can see that the messages have been consumed.

    Use CKafka as a source

    1. Write a configuration file.
      The combination of Flume and CKafka as a source is focused on here, while the sink and channel use the default configuration. Below is a simple demo (configured in the conf folder in the extracted directory). If there is no special requirement, simply replace your own instance IP and topic in the configuration file. The sink used in this example is logger.
    2. Start Flume.
      ./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-source.properties
    3. View the logger output (the default path is logs/flume.log).

    Was this page helpful?

    Was this page helpful?

    • Not at all
    • Not very helpful
    • Somewhat helpful
    • Very helpful
    • Extremely helpful
    Send Feedback
    Help