Apache Flume is a distributed, reliable, and highly available log collection system that supports a wide variety of data sources such as HTTP, log file, JMS, and listening port. It can efficiently collect, aggregate, move, and store massive amounts of log data to a specified storage system like Kafka, distributed file system, and Solr search server.
// The structure of Flume is as follows:
Flume uses agents as the smallest independent unit of operation. An agent is a JVM composed of three main components: source, sink, and channel.
When you store data in a downstream storage module or a computing module such as HDFS or HBase, a lot of complex factors need to be taken into account, such as the number of concurrent writes, system load, and network delay. As a flexible distributed system, Flume has various APIs and provides customizable pipelines.
In the production process, Kafka can act as a cache when the production and consumption are at different paces. It has a partition structure and uses append
to append data, which makes it have an excellent throughput. In addition, it has a replication structure, which makes it highly fault-tolerant.
Therefore, Flume and Kafka together can satisfy most requirements in production environments.
Kafka can be used as source or sink to import or export messages.
Kafka source
Configure Kafka as the message source, i.e., pulling data from Kafka into a specified sink as a consumer. The main configuration items are as follows:
Configuration Item | Description |
---|---|
channels | Configured channel |
type | This must be org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | Kafka broker server |
kafka.consumer.group.id | ID of the group as Kafka consumer |
kafka.topics | Kafka topic as data source |
batchSize | Size of each write into channel |
batchDurationMillis | Maximum time interval between writes |
Sample:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Kafka sink
Configure Kafka as the message receiver, i.e., pushing data into the Kafka server as a producer for subsequent operations. The main configuration items are as follows:
Configuration Item | Description |
---|---|
channel | Configured channel |
type | This must be org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | Kafka broker server |
kafka.topics | Kafka topic as data source |
kafka.flumeBatchSize | Size of each written batch |
kafka.producer.acks | Production policy of Kafka producer |
Sample:
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
For more information, please visit Apache Flume's official website.
The private IP and port in the figure serve as the subsequent server IP.
flume_test
.Use CKafka as a sink
conf
folder in the extracted directory). If there is no special requirement, simply replace your own instance IP and topic in the configuration file. The source used in this example is tail -F flume-test
, which represents the information newly added in the file../bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-sink.properties
flume-test
file. At this time, the messages will be written by Flume to CKafka../kafka-console-consumer.sh --bootstrap-server 172.16.16.12:9092 --topic flume_test --from-beginning --new-consumer
You can see that the messages have been consumed.Use CKafka as a source
conf
folder in the extracted directory). If there is no special requirement, simply replace your own instance IP and topic in the configuration file. The sink used in this example is logger
../bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-source.properties
logs/flume.log
).
Was this page helpful?