tencent cloud

Feedback

TDSQL for MySQL

Last updated: 2023-11-08 14:53:32

    Overview

    The tdsql-subscribe connector is dedicated to the subscription of TDSQL for MySQL data. It allows integrating the incremental binlog data from TDSQL for MySQL as instructed in Creating TDSQL for MySQL Data Subscription. Before using this connector, make sure a data subscription task has been successfully configured.
    Note
    The tdsql-subscribe connector is under beta testing. If you need to try it, submit a ticket.

    Versions

    Flink Version
    Description
    1.11
    Unsupported
    1.13
    Supported
    1.14
    Unsupported
    1.16
    Unsupported

    ‌## Limits

    The tdsql-subscribe connector can be used as a source but not a sink of a stream.

    Defining a table in DDL

    When the tdsql-subscribe connector is used as a source, most of its WITH parameters are similar to those of the Kafka connector, and all connection parameters can be found in the subscription task. Please note that when using the tdsql-subscribe connector, you must set format to protobuf, because the messages sent to Kafka via the subscription task is in protobuf format. Compared with the Kafka connector, the tdsql-subscribe connector contains more authentication information from the subscription task.

    As a source

    Input in protobuf

    CREATE TABLE `DataInput` (
    `id` INT,
    `name` VARCHAR,
    `age` INT,
    ) WITH (
    'connector' = 'tdsql-subscribe', -- Make sure you specify the corresponding connector.
    'tdsql.database.name' = 'test_case_2022_06_0*', -- Filter subscription messages to consume subscription data of databases whose name matches the regex `test_case_2022_06_0*`.
    'tdsql.table.name' = 'test_0*', -- Filter subscription messages to consume subscription data of tables whose name matches the regex `test_0*`.
    'topic' = 'topic-subs-5xop97nffk-tdsqlshard-xxx', -- Replace it with the topic consumed by the subscription task.
    'scan.startup.mode' = 'earliest-offset', -- Valid values: `latest-offset`, `earliest-offset`, `specific-offsets, and `group-offsets`.
    'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:3212', -- Replace it with the Kafka connection address of your subscription task.
    'properties.group.id' = 'consumer-grp-subs-xxx-kk',
    'format' = 'protobuf', -- Only protobuf is allowed.
    'properties.security.protocol'='SASL_PLAINTEXT', -- Authentication protocol.
    'properties.sasl.mechanism'='SCRAM-SHA-512', -- Authentication method.
    'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="account-subs-xxx-username" password="psw";' -- Username and password.
    );
    CREATE TABLE `jdbc_upsert_sink_table` (
    id INT PRIMARY KEY NOT ENFORCED,
    name STRING,
    age INT
    ) WITH (
    -- Specify the parameters for database connection.
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.28.28.138:3306/testdb', -- Replace it with your MySQL database connection URL.
    'table-name' = 'sink', --The table into which the data will be written.
    'username' = 'user', -- The username (with the INSERT permission required) for database access.
    'password' = 'psw' -- The password for database access.
    );
    INSERT INTO jdbc_upsert_sink_table SELECT * FROM DataInput;

    WITH parameters

    Option
    Required
    Default Value
    Description
    connector
    Yes
    None
    Here, it should be 'tdsql-subscribe'.
    topic
    Yes
    None
    The name of the Kafka topic to be read.
    properties.bootstrap.servers
    Yes
    None
    The Kafka bootstrap addresses, separated by comma.
    properties.group.id
    Yes
    None
    The ID of the Kafka consumer group.
    format
    Yes
    None
    The input format of a Kafka message. Only protobuf is supported.
    scan.startup.mode
    No
    group-offsets
    The Kafka consumer start mode.
    Valid values: latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp. If 'specific-offsets' is used, specify the offset of each partition, such as 'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'.
    If 'timestamp' is used, specify the startup timestamp (in ms), such as 'scan.startup.timestamp-miles' = '1631588815000'.
    scan.startup.specific-offsets
    No
    None
    If scan.startup.mode is set to 'specific-offsets', this option must be used to specify the specific offset of the startup, such as 'partition:0,offset:42;partition:1,offset:300'.
    scan.startup.timestamp-millis
    No
    None
    If scan.startup.mode is set to 'timestamp', this option must be used to specify the time point (Unix timestamp in ms) of the startup.
    tdsql.database.name
    No
    None
    The name of the TDSQL database. If this option is set, this connector can consume the binlog data of the database specified here, provided that the subscription task contains the binlog data of this database. This option supports a regex, such as test_case_2022_06_0*.
    tdsql.table.name
    No
    None
    The name of the TDSQL table. If this option is set, this connector can consume the binlog data of the table specified here, provided that the subscription task contains the binlog data of this table. ‌This option supports a regex, such as test_0* or test_1,test_2.
    Note
    To use tdsql.database.name or tdsql.table.name, we recommend you subscribe to all instances in the subscription task. If multiple Stream Compute Service tasks consume different TDSQL tables, each task must use a unique consumer group of the subscription task. You can create consumer groups in the subscription task.

    Notes

    1. If a subscription task is configured with multiple databases and tables or one database and multiple tables, the tables must be in the same schema to correctly integrate the data of the subscription task.
    2. Chinese characters in a source table can be encoded only using utf8 or gbk.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support