Storing Kafka Data in HDFS or COS Through Flume

Last updated: 2021-07-08 10:43:44

    Scenario Description

    Collecting and saving the data in Kafka to HDFS or COS via Flume

    Development Preparations

    • This task requires access to CKafka, so you need to create a CKafka instance first. For more information, see Message Queue CKafka.
    • Create an EMR cluster. When creating the EMR cluster, you need to select the Spark component on the software configuration page and enable access to COS on the basic configuration page.

    Using the Kafka Toolkit in the EMR Cluster

    First, you need to check the private IP and port number of CKafka. Log in to the CKafka console, select the CKafka instance you want to use, and find the private IP ($kafkaIP) and port number (generally 9092) in the basic information section. Create a topic named kafka_test on the topic management page.

    Configuring Flume

    1. Create a Flume configuration file
      agent.sources = kafka_source
      agent.channels = mem_channel
      agent.sinks = hdfs_sink
      # The following is for source configuration.
      agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
      agent.sources.kafka_source.channels = mem_channel
      agent.sources.kafka_source.batchSize = 5000
      agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092
      agent.sources.kafka_source.kafka.topics = kafka_test
      # The following is for sink configuration.
      agent.sinks.hdfs_sink.type = hdfs = mem_channel
      agent.sinks.hdfs_sink.hdfs.path = /data/flume/kafka/%Y%m%d (or cosn://bucket/xxx)
      agent.sinks.hdfs_sink.hdfs.rollSize = 0  
      agent.sinks.hdfs_sink.hdfs.rollCount = 0  
      agent.sinks.hdfs_sink.hdfs.rollInterval = 3600  
      agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
      # The following is for channel configuration.
      agent.channels.mem_channel.type = memory
      agent.channels.mem_channel.capacity = 100000
      agent.channels.mem_channel.transactionCapacity = 10000
    2. Run Flume.
      ./bin/flume-ng agent --conf ./conf/ -f -n agent -Dflume.root.logger=INFO,console
    3. Run Kafka producer.
      [hadoop@172 kafka]$ ./bin/ --broker-list $kafkaIP:9092 --topic kafka_test


    • Enter information on the Kafka producer client and press Enter.
    • Check whether the corresponding directory and file hadoop fs -ls /data/flume/kafka/ have been generated in HDFS.


    Kafka Source Configuration Description