tencent cloud

Stream Compute Service

Log consumption CLS

다운로드
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-06-03 17:15:50

Introduction

Cloud Log Service (CLS) can serve as a data source (Source) for Stream Compute Service (SCS). You can export logs to SCS by CLS Log topics via the CLS Kafka protocol consumption feature for further stream computing.

Version Description

Flink Version
Description
1.11
Supported
1.13
Supported
1.14
Not supported
1.16
Supported

Applicable Scope

CLS can be used as a data source table (Source).

CLS Log Topics

Before creating a CLS source table, you need to create a CLS log topic by following these steps:
1. On the CLS Log Topic page, select Log Topic > Create Log Topic to create a log topic.

2. After the log topic is created successfully, click the log topic name/ID for the new log topic on the list page to go to the details page.

3. On the details page, switch to the Consumption Over Kafka tab and enable the feature.

4. Status after enabling the feature is as follows:

5. For any questions, see Kafka Protocol Consumption.

SCS Consumption CLS Logs

Create a job in the SCS console.



-- Table creation statements are as follows:
CREATE TABLE nginx_source (
-- Fields in the logs.
@metadata STRING,
@timestamp TIMESTAMP,
agent STRING,
ecs STRING,
host STRING,
input STRING,
log STRING,
message STRING,
partition_id BIGINT METADATA
FROM
'partition' VIRTUAL,
-- Kafka partition.
ts TIMESTAMP(3) METADATA
FROM
'timestamp'
) WITH (
'connector' = 'kafka',
-- The topic name provided by the CLS Kafka protocol consumption console, for example, XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the console.
'topic' = 'Your consumption topics',
-- Service address + port, for example, public network port 9096 and private network port 9095, for private network consumption. Please enter according to your actual situation.
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
-- Please replace with your consumer group name.
properties.group.id' = 'Consumer group ID',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
-- The username is the logset ID, for example, ca5cXXXXdd2e-4ac0af12-92d4b677d2c6.
-- The password is a string combining the user's SecretId#SecretKey, for example, AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac. Be careful not to lose the #. We recommend that you use the sub-account keys. When authorizing a sub-account, follow the principle of least privilege, configuring the action and resource in the sub-account access policy to the minimum range to fulfill the operations. Note that jaas.config ends with a semicolon. An error will be reported if not filled in.
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);
Note:
Flink versions and the corresponding packages for SASL authentication configurations:
Version 1.16 or earlier: org.apache.kafka.common.security.plain.PlainLoginModule
Version 1.16 or later: org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule

Must-Knows

For prerequisites and relevant restrictions, see Kafka Protocol Consumption.

도움말 및 지원

문제 해결에 도움이 되었나요?

피드백