tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

Logstash 接入 CKafka

PDF
聚焦模式
字号
最后更新时间: 2026-01-05 15:20:02
Logstash 是一个开源的日志处理工具,可以从多个源头收集数据、过滤收集的数据并对数据进行存储作为其他用途。
Logstash 灵活性强,拥有强大的语法分析功能,插件丰富,支持多种输入和输出源。Logstash 作为水平可伸缩的数据管道,与 Elasticsearch 和 Kibana 配合,在日志收集检索方面功能强大。

Logstash 工作原理

Logstash 数据处理可以分为三个阶段:inputs → filters → outputs。
1. inputs:产生数据来源,例如文件、syslog、redis 和 beats 此类来源。
2. filters:修改过滤数据, 在 Logstash 数据管道中属于中间环节,可以根据条件去对事件进行更改。一些常见的过滤器包括:grok、mutate、drop 和 clone 等。
3. outputs:将数据传输到其他地方,一个事件可以传输到多个 outputs,当传输完成后这个事件就结束。Elasticsearch 就是最常见的 outputs。
同时 Logstash 支持编码解码,可以在 inputs 和 outputs 端指定格式。




Logstash 接入 Kafka 的优势

可以异步处理数据:防止突发流量。
解耦:当 Elasticsearch 异常的时候不会影响上游工作。
注意
Logstash 过滤消耗资源,如果部署在生产 server 上会影响其性能。




前提条件

下载并安装 Logstash,参见 Download Logstash
下载并安装 JDK 8,参见 Download JDK 8

操作步骤

步骤1:准备工作

1. 在控制台的弹性 Topic 列表页面创建一个 Topic。



2. 单击 Topic 的 “ID” 进入基本信息页面,获取用户名、密码和地址信息。



3. 订阅关系页签,新建一个订阅关系(消费组)。




步骤2:接入 CKafka

说明
您可以单击以下页签,查看 CKafka 作为 inputs 或者 outputs 接入的具体步骤。
作为 inputs 接入
作为 outputs 接入
1. 执行 bin/logstash-plugin list,查看已经支持的插件是否含有 logstash-input-kafka。



2. 在 .bin/ 目录下编写配置文件 input.conf。 此处将标准输出作为数据终点,将 Kafka 作为数据来源。其中 kafka-client-jaas.conf 为 SASL-PLAINTEXT 的用户名和密码配置文件。
input {
kafka {
bootstrap_servers => "xx.xx.xx.xx:xxxx" // ckafka 接入地址
group_id => "logstash_group" // ckafka groupid 名称
topics => ["logstash_test"] // ckafka topic 名称
consumer_threads => 3 // 消费线程数,一般与 ckafka 分区数一致
auto_offset_reset => "earliest"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
jaas_path => "xx/xx/kafka-client-jaas.conf"
}
}
output {
stdout{codec=>rubydebug}
}
kafka-client-jaas.conf 内容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="username"
password="password";
};
3. 执行以下命令启动 Logstash,进行消息消费。
./logstash -f input.conf
返回结果如下:


可以看到刚才 Topic 中的数据被消费出来。
1. 执行 bin/logstash-plugin list,查看已经支持的插件是否含有 logstash-output-kafka。



2. 在.bin/目录下编写配置文件 output.conf。 此处将标准输入作为数据来源,将 Kafka 作为数据目的地。其中 kafka-client-jaas.conf 为 SASL-PLAINTEXT 的用户名和密码配置文件。
input {
stdin{}
}

output {
kafka {
bootstrap_servers => "xx.xx.xx.xx:xxxx" // ckafka 接入地址
topic_id => "logstash_test" // ckafka topic 名称
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
jaas_path => "xx/xx/kafka-client-jaas.conf"
}
}
kafka-client-jaas.conf 内容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="username"
password="password";
};
3. 执行如下命令启动 Logstash,向创建的 Topic 发送消息。
./logstash -f output.conf



4. 启动 CKafka 消费者,检验上一步的生产数据。



参数
描述
bootstrapServers
接入地址,在控制台的弹性 Topic 基本信息页面获取。



username
用户名,在控制台的弹性 Topic 基本信息页面获取。
password
用户密码,在控制台的弹性 Topic 基本信息页面获取。
topic_id
Topic 名称,在控制台的弹性 Topic 基本信息页面获取。
group.id
消费组名称,在控制台的弹性 Topic 的订阅关系列表获取。





帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈