tencent cloud

Feedback

Last updated: 2023-11-08 14:58:59

    Overview

    Kafka data pipelines are among the most used sources and sinks in stream computing. You can ingest stream data to a Kafka topic, process the data using Flink operators, and output the result to another Kafka topic on the same or a different instance.
    Kafka supports reading data from and writing data to multiple partitions of the same topic. This can increase throughput and reduce data skew.

    Versions

    Flink Version
    Description
    1.11
    Supported
    1.13
    Supported
    1.14
    Supported
    1.16
    Supported

    Limits

    Kafka can be used as a source or a sink for tuple streams.
    Kafka can also be used together with Debezium‌ and Canal‌ to capture and subscribe to changes to databases such as MySQL and PostgreSQL and to process the changes.

    Defining a table in DDL

    As a source

    JSON format

    CREATE TABLE `kafka_json_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Define Kafka parameters.
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.
    'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.
    'properties.group.id' = 'testGroup', -- (Required) The group ID.
    
    -- Define the data format (JSON).
    'format' = 'json',
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    );

    CSV format

    CREATE TABLE `kafka_csv_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Define Kafka parameters.
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.
    'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.
    'properties.group.id' = 'testGroup', -- (Required) The group ID.
    
    -- Define the data format (CSV).
    'format' = 'csv'
    );

    Debezium format

    CREATE TABLE `kafka_debezium_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Define Kafka parameters.
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.
    'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.
    'properties.group.id' = 'testGroup', -- (Required) The group ID.
    
    -- Define the data format (JSON data output by Debezium).
    'format' = 'debezium-json'
    );

    Canal format

    CREATE TABLE `kafka_source`
    (
    aid BIGINT COMMENT 'unique id',
    charname string,
    `ts` timestamp(6),
    origin_database STRING METADATA FROM 'value.database' VIRTUAL,
    origin_table STRING METADATA FROM 'value.table' VIRTUAL,
    origin_es TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
    origin_type STRING METADATA FROM 'value.operation-type' VIRTUAL,
    `batch_id` bigint METADATA FROM 'value.batch-id' VIRTUAL,
    `is_ddl` boolean METADATA FROM 'value.is-ddl' VIRTUAL,
    origin_old ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before' VIRTUAL,
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type' VIRTUAL,
    origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
    `sql` STRING METADATA FROM 'value.sql' VIRTUAL,
    origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL
    ) WITH (
    'connector' = 'kafka', -- Make sure you specify the corresponding connector.
    'topic' = '$TOPIC', -- Replace this with the topic you want to consume data from.
    'properties.bootstrap.servers' = '$IP:$PORT', -- Replace this with your Kafka connection address.
    'properties.group.id' = 'testGroup', -- (Required) The group ID.
    'scan.startup.mode' = 'latest-offset',
    'scan.topic-partition-discovery.interval' = '5s',
    'format' = 'canal-json',
    'canal-json.ignore-parse-errors' = 'false', -- Ignore JSON parse errors.
    'canal-json.source.append-mode' = 'true' -- Only Flink 1.13 or later is supported.
    );

    As a sink

    JSON format

    CREATE TABLE `kafka_json_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Define Kafka parameters.
    'connector' = 'kafka',
    'topic' = 'Data-Output', -- Replace this with the topic you want to write to.
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.
    
    -- Define the data format (JSON).
    'format' = 'json',
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    );

    CSV format

    CREATE TABLE `kafka_csv_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Define Kafka parameters.
    'connector' = 'kafka',
    'topic' = 'Data-Output', -- Replace this with the topic you want to write to.
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.
    
    -- Define the data format (CSV).
    'format' = 'csv'
    );

    Canal format

    CREATE TABLE `kafka_canal_json_sink_table`
    (
    aid BIGINT COMMENT 'unique id',
    charname string,
    `ts` timestamp(6),
    origin_database STRING METADATA FROM 'value.database',
    origin_table STRING METADATA FROM 'value.table',
    origin_ts TIMESTAMP(3) METADATA FROM 'value.event-timestamp',
    `type` STRING METADATA FROM 'value.operation-type',
    `batch_id` bigint METADATA FROM 'value.batch-id',
    `isDdl` BOOLEAN METADATA FROM 'value.is-ddl',
    `old` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before',
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type',
    `pk_names` ARRAY<STRING> METADATA FROM 'value.pk-names',
    `sql` STRING METADATA FROM 'value.sql',
    `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql-type',
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
    ) WITH (
    'connector' = 'kafka', -- Make sure you specify the corresponding connector.
    'topic' = '$TOPIC', -- Replace this with the topic you want to consume data from.
    'properties.bootstrap.servers' = '$IP:$PORT', -- Replace this with your Kafka connection address.
    'properties.group.id' = 'testGroup', -- (Required) The group ID.
    'format' = 'canal-json'
    );

    WITH parameters

    Option
    Required
    Default Value
    Description
    connector
    Yes
    -
    Here, it should be 'kafka'.
    topic
    Yes
    -
    The name of the Kafka topic to read from or write to.
    properties.bootstrap.servers
    Yes
    -
    The Kafka bootstrap server addresses, separated with commas.
    properties.group.id
    Yes for source connectors
    -
    The Kafka consumer group ID.
    format
    Yes
    -
    The input and output format of Kafka messages. Currently, valid values include csv, json, avro, debezium-json, and canal-json. For Flink 1.13, maxwell-json is supported.
    scan.startup.mode
    No
    group-offsets
    The Kafka consumer startup mode. Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.
    'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'. If 'specific-offsets' s used, you need to specify the offsets of each partition.
    'scan.startup.timestamp-millis' = '1631588815000'. If 'timestamp' is used, you need to specify the start timestamp (milliseconds).
    scan.startup.specific-offsets
    No
    -
    If scan.startup.mode is 'specific-offsets', you need to use this option to specify the start offset, for example, 'partition:0,offset:42;partition:1,offset:300'.
    scan.startup.timestamp-millis
    No
    -
    If scan.startup.mode is 'timestamp', you need to use this option to specify the start timestamp (Unix format in milliseconds).
    sink.partitioner
    No
    -
    The Kafka output partitioning. Currently, the following types of partitioning are supported:
    fixed: One Flink partition corresponds to not more than one Kafka partition.
    round-robin: One Flink partition is distributed in turn to different Kafka partitions.
    Custom partitioning: You can also implement partitioning logic by inheriting the FlinkKafkaPartitioner class.

    WITH parameters for JSON

    Option
    Required
    Default Value
    Description
    json.fail-on-missing-field
    No
    false
    If this is true, the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed.
    json.ignore-parse-errors
    No
    false
    If this is true, when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false, the job will fail in case of a parse error.
    json.timestamp-format.standard
    No
    SQL
    The JSON timestamp format. The default value is SQL, in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision}. You can also set it to ISO-8601, and the format will be yyyy-MM-ddTHH:mm:ss.s{precision}.

    WITH parameters for CSV

    Option
    Required
    Default Value
    Description
    csv.field-delimiter
    No
    ,
    The field delimiter, which is comma by default.
    csv.line-delimiter
    No
    U&'\\000A'
    The line delimiter, which is \\n by default (in SQL, you must use U&'\\000A'). You can also set it to \\r (in SQL, you need to use U&'\\000D').
    csv.disable-quote-character
    No
    false
    Whether to disable quote characters. If this is true, 'csv.quote-character' cannot be used.
    csv.quote-character
    No
    "
    The quote characters. Text inside quotes will be viewed as a whole. The default value is ".
    csv.ignore-parse-errors
    No
    false
    Whether to ignore parse errors. If this is true, fields will be set to null in case of parse failure.
    csv.allow-comments
    No
    false
    Whether to ignore comment lines that start with # and output them as empty lines (if this is true, make sure you set csv.ignore-parse-errors to true as well).
    csv.array-element-delimiter
    No
    ;
    The array element delimiter, which is ; by default.
    csv.escape-character
    No
    -
    The escape character. By default, escape characters are disabled.
    csv.null-literal
    No
    -
    The string that will be seen as null.

    WITH parameters for Debezium-json

    Option
    Required
    Default Value
    Description
    debezium-json.schema-include
    No
    false
    Whether to include schemas. If you specified 'value.converter.schemas.enable' when configuring Kafka Connect with Debezium, the JSON data sent by Debezium will include schema information, and you need to set this option to true.
    debezium-json.ignore-parse-errors
    No
    false
    Whether to ignore parse errors. If this is true, fields will be set to null in case of parse failure.
    debezium-json.timestamp-format.standard
    No
    SQL
    The JSON timestamp format. The default value is SQL, in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision}. You can also set it to ISO-8601, and the format will be yyyy-MM-ddTHH:mm:ss.s{precision}.

    WITH parameters for Canal

    Option
    Required
    Default Value
    Description
    canal-json.source.append-mode
    No
    false
    Whether to support append streams. You can set this to true when, for example, writing Canal JSON data from Kafka to Hive. This option is only supported for Flink 1.13 clusters.
    debezium-json.ignore-parse-errors
    No
    false
    Whether to ignore parse errors. If this is true, fields will be set to null in case of parse failure.
    canal-json.*
    No
    -

    Metadata supported by Canal (only Flink 1.13 clusters)

    The metadata below can only be used as virtual columns. If a metadata column has the same name as a physical column, you can use the name meta. for the metadata column.
    Column
    Data Type
    Description
    Database
    STRING NOT NULL
    The name of the database to which the row belongs.
    table
    STRING NOT NULL
    The name of the table to which the row belongs.
    event-timestamp
    TIMESTAMP_LTZ(3) NOT NULL
    The time when the row was changed in the database.
    batch-id
    BIGINT
    The batch ID of the binlog.
    is-ddl
    BOOLEAN
    Whether it is a DDL statement.
    mysql-type
    MAP
    The database structure.
    update-before
    ARRAY
    The field value before it was modified.
    pk-names
    ARRAY
    The primary key field.
    sql
    STRING
    Null.
    sql-type
    MAP
    The mappings between the ‌sql_type fields and Java data types.
    ingestion-timestamp
    TIMESTAMP_LTZ(3) NOT NULL
    The time when the row was received and processed.
    operation-type
    STRING
    The operation type, such as INSERT or DELETE.

    Example

    JSON

    CREATE TABLE `kafka_json_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Define Kafka parameters.
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.
    'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.
    'properties.group.id' = 'testGroup', -- (Required) The group ID.
    
    -- Define the data format (JSON).
    'format' = 'json',
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    );
    CREATE TABLE `kafka_json_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Define Kafka parameters.
    'connector' = 'kafka',
    'topic' = 'Data-Output', -- Replace this with the topic you want to write to.
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.
    
    -- Define the data format (JSON).
    'format' = 'json',
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    );
    insert into kafka_json_sink_table select * from kafka_json_source_table;

    Canal

    CREATE TABLE `source`
    (
    `aid` bigint,
    `charname` string,
    `ts` timestamp(6),
    `database_name` string METADATA FROM 'value.database_name',
    `table_name` string METADATA FROM 'value.table_name',
    `op_ts` timestamp(3) METADATA FROM 'value.op_ts',
    `op_type` string METADATA FROM 'value.op_type',
    `batch_id` bigint METADATA FROM 'value.batch_id',
    `is_ddl` boolean METADATA FROM 'value.is_ddl',
    `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before',
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql_type',
    `pk_names` ARRAY<STRING> METADATA FROM 'value.pk_names',
    `sql` STRING METADATA FROM 'value.sql',
    `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql_type',
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ts',
    primary key (`aid`) not enforced
    ) WITH (
    'connector' = 'mysql-cdc' ,
    'append-mode' = 'true',
    'hostname' = '$IP',
    'port' = '$PORT',
    'username' = '$USERNAME',
    'password' = '$PASSWORD',
    'database-name' = 't_wr',
    'table-name' = 't1',
    'server-time-zone' = 'Asia/Shanghai',
    'server-id' = '5500-5510'
    );
    
    CREATE TABLE `kafka_canal_json_sink`
    (
    aid BIGINT COMMENT 'unique id',
    charname string,
    `ts` timestamp(6),
    origin_database STRING METADATA FROM 'value.database',
    origin_table STRING METADATA FROM 'value.table',
    origin_ts TIMESTAMP(3) METADATA FROM 'value.event-timestamp',
    `type` STRING METADATA FROM 'value.operation-type',
    `batch_id` bigint METADATA FROM 'value.batch-id',
    `isDdl` BOOLEAN METADATA FROM 'value.is-ddl',
    `old` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before',
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type',
    `pk_names` ARRAY<STRING> METADATA FROM 'value.pk-names',
    `sql` STRING METADATA FROM 'value.sql',
    `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql-type',
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
    )
    WITH (
    'connector' = 'kafka', -- Valid values include 'kafka' and 'kafka-0.11'. Make sure you specify the corresponding built-in connector.
    'topic' = 'TOPIC', -- Replace this with the topic you want to consume data from.
    'properties.bootstrap.servers' = '$IP:$PORT', -- Replace this with your Kafka connection address.
    'properties.group.id' = 'testGroup', -- (Required) The group ID.
    'format' = 'canal-json'
    );
    
    insert into kafka_canal_json_sink select * from source;
    CREATE TABLE `source`
    (
    `aid` bigint,
    `charname` string,
    `ts` timestamp(3),
    origin_database STRING METADATA FROM 'value.database' VIRTUAL,
    origin_table STRING METADATA FROM 'value.table' VIRTUAL,
    origin_es TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
    origin_type STRING METADATA FROM 'value.operation-type' VIRTUAL,
    `batch_id` bigint METADATA FROM 'value.batch-id' VIRTUAL,
    `is_ddl` boolean METADATA FROM 'value.is-ddl' VIRTUAL,
    origin_old ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before' VIRTUAL,
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type' VIRTUAL,
    origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
    `sql` STRING METADATA FROM 'value.sql' VIRTUAL,
    origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
    WATERMARK FOR `origin_es` AS `origin_es` - INTERVAL '5' SECOND
    ) WITH (
    'connector' = 'kafka', -- Make sure you specify the corresponding connector.
    'topic' = '$TOPIC', -- Replace this with the topic you want to consume data from.
    'properties.bootstrap.servers' = '$IP:PORT', -- Replace this with your Kafka connection address.
    'properties.group.id' = 'testGroup', -- (Required) The group ID.
    'scan.startup.mode' = 'latest-offset',
    'scan.topic-partition-discovery.interval' = '10s',
    
    'format' = 'canal-json',
    'canal-json.source.append-mode' = 'true', -- This is only supported for Flink 1.13.
    'canal-json.ignore-parse-errors' = 'false'
    );
    
    
    CREATE TABLE `kafka_canal_json` (
    `aid` bigint,
    `charname` string,
    `ts` timestamp(9),
    origin_database STRING,
    origin_table STRING,
    origin_es TIMESTAMP(9),
    origin_type STRING,
    `batch_id` bigint,
    `is_ddl` boolean,
    origin_old ARRAY<MAP<STRING, STRING>>,
    `mysql_type` MAP<STRING, STRING>,
    origin_pk_names ARRAY<STRING>,
    `sql` STRING,
    origin_sql_type MAP<STRING, INT>,
    `ingestion_ts` TIMESTAMP(9),
    dt STRING,
    hr STRING
    ) PARTITIONED BY (dt, hr)
    with (
    'connector' = 'hive',
    'hive-version' = '3.1.1',
    'hive-database' = 'testdb',
    'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
    'sink.partition-commit.trigger'='partition-time',
    'sink.partition-commit.delay'='30 min',
    'sink.partition-commit.policy.kind'='metastore,success-file'
    );
    
    insert into kafka_canal_json select *,DATE_FORMAT(`origin_es`,'yyyy-MM-dd'),DATE_FORMAT(`origin_es`,'HH')
    from `source`;

    SASL authentication

    SASL/PLAIN username and password authentication

    1. Follow the instructions in Message Queue CKafka - Configuring ACL Policy to configure username/password-based authentication (SASL_PLAINTEXT) for the topic.
    2. Select "SASL_PLAINTEXT" as the access mode when adding a routing policy and access the topic via the address of that mode.
    3. Configure WITH parameters for the job.
    CREATE TABLE `YourTable` (
    ...
    ) WITH (
    ...
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    ...
    );
    Note
    username should be instance ID + # + the username you set, and password is the password you configured.

    SASL/GSSAPI Kerberos authentication

    Currently, Tencent Cloud CKafka does not support Kerberos authentication. If your self-built Kafka supports Kerberos authentication, you can follow the steps below to configure it.
    1. Get the Kerberos configuration file for your self-built Kafka cluster. If your cluster is built on top of Tencent Cloud EMR, get the files krb5.conf and emr.keytab in the following paths.
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    2. Package the files into a JAR file.
    jar cvf kafka-xxx.jar krb5.conf emr.keytab
    3. Check the JAR structure (run the Vim command vim kafka-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
    5. Get Kerberos principals to configure advanced job parameters.
    klist -kt /var/krb5kdc/emr.keytab
    
    # The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
    6. Configure WITH parameters for the job.
    CREATE TABLE `YourTable` (
    ...
    ) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'GSSAPI',
    'properties.sasl.kerberos.service.name' = 'hadoop',
    ...
    );
    security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FD
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf
    security.kerberos.login.contexts: KafkaClient
    fs.hdfs.hadoop.security.authentication: kerberos
    Note
    Kerberos authentication is not supported for historical Stream Compute Service clusters. To support the feature, please contact us to upgrade the cluster management service.
    
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support