tencent cloud

Feedback

Upsert Kafka

Last updated: 2023-11-08 11:20:39

    Overview

    The Upsert Kafka connector allows for reading data from and writing data into Kafka topics in the upsert fashion.
    As a source, the Upsert Kafka connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an UPDATE of the last value for the same key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT (INSERT/UPDATE) because any existing row with the same key is overwritten. Also, a record with a null value represents a DELETE.
    As a sink, the Upsert Kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages, and write DELETE data as Kafka messages with null values (which indicates the messages will be deleted). Flink will guarantee the message ordering on the primary key by partitioning data based on the values of the primary key columns, so the update/delete messages on the same key will fall into the same partition.

    Versions

    Flink Version
    Description
    1.11
    Unsupported
    1.13
    Supported
    1.14
    Supported
    1.16
    Supported

    Defining a table in DDL

    CREATE TABLE kafka_upsert_sink_table (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    -- Define Upsert Kafka parameters.
    'connector' = 'upsert-kafka', -- Specify the connector.
    'topic' = 'topic', -- Replace this with the topic you want to consume data from.
    'properties.bootstrap.servers' = '...', -- Replace this with your Kafka connection address.
    'key.format' = 'json', -- Define the data format of keys.
    'value.format' = 'json' -- Define the data format of values.
    );
    Note
    Make sure you define the primary key in the DDL.

    WITH parameters

    Option
    Required
    Default Value
    Data Type
    Description
    connector
    Yes
    -
    String
    The connector to use. For Upsert Kafka, use 'upsert-kafka'.
    topic
    Yes
    -
    String
    The name of the topic to read from and write to.
    properties.bootstrap.servers
    Yes
    -
    String
    A list of Kafka brokers, separated with commas.
    properties.*
    No
    -
    String
    Arbitrary Kafka configurations. The suffixes must match the configuration key defined in the Kafka configuration document.
    Flink will remove the "properties." key prefix and pass the transformed keys and values to the underlying KafkaClient. For example, you can use 'properties.allow.auto.create.topics' = 'false' to disable automatic topic creation. However, some configurations, such as 'key.deserializer' and 'value.deserializer', cannot be set using this option because Flink will override them.
    key.format
    Yes
    -
    String
    The format used to deserialize and serialize the key part of Kafka messages. The key field is defined by PRIMARY KEY. Valid values include 'csv', 'json', and 'avro'.
    key.fields-prefix
    No
    -
    String
    A custom prefix for all fields of the key format to avoid name conflicts with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and key.fields will work with prefixed names. When the data type of the key format is constructed, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that value.fields-include be set to EXCEPT_KEY.
    value.format
    Yes
    -
    String
    The format used to deserialize and serialize the value part of Kafka messages. Valid values include 'csv', 'json', and 'avro'.
    value.fields-include
    No
    'ALL'
    String
    A strategy specifying how to deal with key columns in the data type of the value format. Valid values:
    ALL: All physical columns of the table schema will be included in the value format, including columns defined as primary keys.
    EXCEPT_KEY: All physical columns of the table schema will be included in the value format except columns defined as primary keys.
    sink.parallelism
    No
    -
    Integer
    The parallelism of the Upsert Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.
    sink.buffer-flush.max-rows
    No
    0
    Integer
    The maximum number of buffered records before flush. When the sink receives many updates on the same key, the buffer will retain the last record of the same key. This helps reduce data shuffling and avoid possible tombstone messages to Kafka topics. To disable buffer flushing, set this parameter to 0 (default value). Note both sink.buffer-flush.max-rows and sink.buffer-flush.interval must be set to greater than zero to enable sink buffer flushing.
    sink.buffer-flush.interval
    No
    0
    Duration
    The interval at which asynchronous threads flush data. When the sink receives many updates on the same key, the buffer will retain the last record of the same key. This helps reduce data shuffling and avoid possible tombstone messages to Kafka topics.
    To disable buffer flushing, set this parameter to 0 (default value). Note both sink.buffer-flush.max-rows and sink.buffer-flush.interval must be set to greater than zero to enable sink buffer flushing.

    Example

    CREATE TABLE `kafka_json_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Define Kafka parameters.
    'connector' = 'kafka',
    'topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.
    'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.
    'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.
    'properties.group.id' = 'testGroup', -- (Required) The group ID.
    
    -- Define the data format (JSON).
    'format' = 'json',
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    );
    
    CREATE TABLE kafka_upsert_sink_table (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    -- Define Upsert Kafka parameters.
    'connector' = 'upsert-kafka', -- Specify the connector.
    'topic' = 'topic', -- Replace this with the topic you want to consume data from.
    'properties.bootstrap.servers' = '...', -- Replace this with your Kafka connection address.
    'key.format' = 'json', -- Define the data format of keys.
    'value.format' = 'json' -- Define the data format of values.
    );
    
    -- Calculate 'pv' and 'uv' and insert them to 'upsert-kafka sink'.
    INSERT INTO kafka_upsert_sink_table
    SELECT * FROM kafka_json_source_table;

    SASL authentication

    SASL/PLAIN username and password authentication

    1. Follow the instructions in Message Queue CKafka - Configuring ACL Policy to configure username/password-based authentication (SASL_PLAINTEXT) for the topic.
    2. Select "SASL_PLAINTEXT" as the access mode when adding a routing policy and access the topic via the address of that mode.
    3. Configure WITH parameters for the job.
    CREATE TABLE `YourTable` (
    ...
    ) WITH (
    ...
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    ...
    );
    Note
    username should be instance ID + # + the username you set, and password is the password you configured.

    SASL/GSSAPI Kerberos authentication

    Currently, Tencent Cloud CKafka does not support Kerberos authentication. If your self-built Kafka supports Kerberos authentication, you can follow the steps below to configure it.
    1. Get the Kerberos configuration file for your self-built Kafka cluster. If your cluster is built on top of Tencent Cloud EMR, get the files krb5.conf and emr.keytab in the following paths.
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    2. Package the files into a JAR file.
    jar cvf kafka-xxx.jar krb5.conf emr.keytab
    3. Check the JAR structure (run the Vim command vim kafka-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
    5. Get Kerberos principals to configure advanced job parameters.
    klist -kt /var/krb5kdc/emr.keytab
    
    # The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
    6. Configure WITH parameters for the job.
    CREATE TABLE `YourTable` (
    ...
    ) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'GSSAPI',
    'properties.sasl.kerberos.service.name' = 'hadoop',
    ...
    );
    Note
    The value of properties.sasl.kerberos.service.name must match the principal you use. For example, if you use hadoop/${IP}@EMR-OQPO48B9, the parameter value should be hadoop.
    security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FD
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf
    security.kerberos.login.contexts: KafkaClient
    fs.hdfs.hadoop.security.authentication: kerberos
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support