tencent cloud

文档反馈

Filebeat 接入 CKafka

最后更新时间:2024-01-09 14:56:36
    Beats 平台 集合了多种单一用途数据采集器。这些采集器安装后可用作轻量型代理,从成百上千或成千上万台机器向目标发送采集数据。
    
    
    Beats 有多种采集器,您可以根据自身的需求下载对应的采集器。本文以 Filebeat(轻量型日志采集器)为例,向您介绍 Filebeat 接入 CKafka 的操作方法,及接入后常见问题的解决方法。

    前提条件

    下载并安装 Filebeat(参见 Download Filebeat
    下载并安装JDK 8(参见 Download JDK 8

    操作步骤

    步骤1:获取 CKafka 实例接入地址

    1. 登录 CKafka 控制台
    2. 在左侧导航栏选择实例列表,单击实例的“ID”,进入实例基本信息页面。
    3. 在实例的基本信息页面的接入方式模块,可获取实例的接入地址。
    
    
    

    步骤2:创建 Topic

    1. 在实例基本信息页面,选择顶部Topic管理页签。
    2. 在 Topic 管理页面,单击新建,创建一个名为 test 的 Topic。
    
    
    

    步骤3:准备配置文件

    进入 Filebeat 的安装目录,创建配置监控文件 filebeat.yml。
    #======= Filebeat7.x之后的版本,将 filebeat.prospectors 修改为 filebeat.inputs 即可 =======
    filebeat.prospectors:
    
    - input_type: log
    
    # 此处为监听文件路径
    paths:
    - /var/log/messages
    
    #======= Outputs =========
    
    #------------------ kafka -------------------------------------
    output.kafka:
    version:0.10.2 // 根据不同 CKafka 实例开源版本配置
    # 设置为CKafka实例的接入地址
    hosts: ["xx.xx.xx.xx:xxxx"]
    # 设置目标topic的名称
    topic: 'test'
    partition.round_robin:
    reachable_only: false
    
    required_acks: 1
    compression: none
    max_message_bytes: 1000000
    
    # SASL 需要配置下列信息,如果不需要则下面两个选项可不配置
    username: "yourinstance#yourusername" //username 需要拼接实例ID和用户名
    password: "yourpassword"
    

    步骤4:Filebeat 发送消息

    1. 执行如下命令启动客户端。
    sudo ./filebeat -e -c filebeat.yml
    2. 为监控文件增加数据(示例为写入监听的 testlog 文件)。
    echo ckafka1 >> testlog
    echo ckafka2 >> testlog
    echo ckafka3 >> testlog
    3. 开启 Consumer 消费对应的 Topic,获得以下数据。
    {"@timestamp":"2017-09-29T10:01:27.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka1","offset":500,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
    {"@timestamp":"2017-09-29T10:01:30.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka2","offset":508,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
    {"@timestamp":"2017-09-29T10:01:33.937Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka3","offset":516,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}

    SASL/PLAINTEXT 模式

    如果您需要进行 SALS/PLAINTEXT 配置,则需要配置用户名与密码。 在 Kafka 配置区域新增加 username 和 password 配置即可。
    # SASL 需要配置下列信息,如果不需要则下面两个选项可不配置
    username: "yourinstance#yourusername" //username 需要拼接实例ID和用户名
    password: "yourpassword"

    常见问题

    在 Filebeat 日志(默认路径/var/log/filebeat/filebeat)中,发现有大量 INFO 日志,例如:
    2019-03-20T08:55:02.198+0800 INFO kafka/log.go:53 producer/broker/544 starting up
    2019-03-20T08:55:02.198+0800 INFO kafka/log.go:53 producer/broker/544 state change to [open] on wp-news-filebeat/4
    2019-03-20T08:55:02.198+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/4 selected broker 544
    2019-03-20T08:55:02.198+0800 INFO kafka/log.go:53 producer/broker/478 state change to [closing] because EOF
    2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 Closed connection to broker bitar1d12:9092
    2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/5 state change to [retrying-3]
    2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/4 state change to [flushing-3]
    2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/5 abandoning broker 478
    2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/2 state change to [retrying-2]
    2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/2 abandoning broker 541
    2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/leader/wp-news-filebeat/3 state change to [retrying-2]
    2019-03-20T08:55:02.199+0800 INFO kafka/log.go:53 producer/broker/478 shut down
    出现大量 INFO 可能是 Filebeat 版本有问题,因为 Elastic 家族的产品发版速度很频繁,而且不同大版本有很多不兼容。
    例如:6.5.x 默认支持 Kafka 的版本是 0.9、0.10、1.1.0、2.0.0,而 5.6.x 默认支持的是0.8.2.0。
    您需要检查配置文件中的版本配置:
    output.kafka:
    version:0.10.2 // 根据不同 CKafka 实例开源版本配置

    说明与注意

    发送数据到 CKafka,不能设置压缩 compression.codec。
    默认不支持 Gzip 压缩格式,如果需要支持,请 提交工单 申请。 Gzip 压缩对于 CPU 的消耗较高,使用 Gzip 会导致所有的消息都是 InValid 消息。
    使用 LZ4 压缩方法时,程序不能正常运行,可能的原因如下: 消息格式错误。CKafka 默认版本为0.10.2,您需要使用 V1 版本的消息格式。
    不同 Kafka Client 的 SDK 设置方式不同,您可以通过开源社区进行查询(例如 C/C++ Client 的说明),设置消息格式的版本。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持