tencent cloud

Feedback

MySQL CDC

Last updated: 2023-11-08 16:18:29

    Overview

    The MySQL CDC connector (as a source) allows for reading snapshot data and incremental data from MySQL databases and guarantees the exactly-once semantic. This connector uses Debezium as its underlying platform to implement change data capture (CDC).

    How MySQL CDC 1.x works

    1. Acquires a global read lock to prohibit writes by other database clients.
    2. Starts a repeatable read transaction to ensure that data is always read from the same checkpoint.
    3. Reads the current binlog position.
    4. ‍Reads the schema of the database and table configured in the connector.
    5. Releases the global read lock to allow writes by other databases.
    6. Scans the table and, after all data is read, captures the changes made after the binlog position in step 3.
    Flink will periodically perform checkpoints to record the binlog position. In case of failover, the job will restart and restore from the checkpointed binlog position. Consequently, it guarantees the exactly-once semantic.

    How MySQL CDC 2.x works

    1. A MySQL table must contain a primary key. If a composite key is used, the first field of the key will be selected as the partitioning key (splitKey) to divide the data into chunks during the snapshot phase.
    2. A lock-free algorithm is used during the snapshot phase, with no need to lock the table.
    3. The sync process consists of two phases. During the snapshot phase, chunks are read concurrently. After this phase ends, the incremental phase starts. The whole process supports checkpoints to guarantee the exactly-once semantic.

    Versions

    Flink Version
    Description
    1.11
    MySQL v5.6 supported
    1.13
    MySQL v5.6, v5.7, and v8.x supported
    Configured by default. The source must contain a primary key. If the source has no primary key, you need to set 'scan.incremental.snapshot.enabled' to 'false' in the WITH parameters.
    1.14
    MySQL v5.6, v5.7, and v8.x supported
    Configured by default. The source must contain a primary key. If the source has no primary key, you need to set 'scan.incremental.snapshot.enabled' to 'false' in the WITH parameters.
    1.16
    MySQL v5.6, v5.7, and v8.x supported
    Configured by default. The source must contain a primary key. ‌If the source has no primary key, you need to set 'scan.incremental.snapshot.enabled' to 'false' in the WITH parameters.

    Limits

    The MySQL CDC connector can be used only as a source.

    Defining a table in DDL

    CREATE TABLE `mysql_cdc_source_table` (
    `id` INT,
    `name` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
    ) WITH (
    'connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.
    'hostname' = '192.168.10.22', -- IP of the MySQL database server.
    'port' = '3306', -- Integer port number of the MySQL database server.
    'username' = 'debezium', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
    'password' = 'hello@world!', -- Password to use when connecting to the MySQL database server.
    'database-name' = 'YourDatabase', -- Database name of the MySQL server to monitor.
    'table-name' = 'YourTable' -- Table name of the MySQL database to monitor.
    );

    WITH parameters

    Option
    Description
    Required
    Remarks
    connector
    The connector to use.
    Yes
    Here, it should be 'mysql-cdc'.
    hostname
    IP address or hostname of the MySQL database server.
    Yes
    -
    port
    Integer port number of the MySQL database server.
    No
    Default value: 3306
    username
    Name of the MySQL database to use when connecting to the MySQL database server.
    Yes
    A MySQL user with required permissions (including SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT).
    password
    Password to use when connecting to the MySQL database server.
    Yes
    -
    database-name
    Database name of the MySQL server to monitor.
    Yes
    ‌The table-name also supports regular expressions to monitor multiple tables matching the regular expression.
    table-name
    Table name of the MySQL database to monitor.
    Yes
    The table-name also supports regular expressions to monitor multiple tables matching the regular expression.
    server-id
    A numeric ID of this database client.
    No
    It must be unique across all currently-running database processes in the MySQL cluster. We recommend setting a unique ID range for each job in a database, such as 5400-5405. By default, a random number equal to 6400 - Integer.MAX_VALUE is generated.
    server-time-zone
    Session time zone in the database server.
    No
    An example is "Asia/Shanghai". It controls how the TIMESTAMP type in MySQL is converted to STRING.
    append-mode
    Whether to enable the append mode.
    No
    This mode is available to Flink v1.13 or later. It allows, for example, syncing the MySQL CDC data to Hive.
    filter-duplicate-pair-records
    Filters source field change records that are not defined in the Flink DDL statements.
    No
    For example, a MySQL source contains four fields a, b, c, and d, but only a and b are defined in the table creation SQL statements. If this option is used, change records involving only field c or d will be ignored and not be delivered to the downstream system, reducing the data to be manipulated.
    scan.lastchunk.optimize.enable
    Repartitions the last chunk of the snapshot phase.
    No
    Numerous writes and changes continuously made to the source during the snapshot phase may cause the last chunk to be too large, resulting in the crash and restart of TaskManagers due to OOM. If this option is enabled (set to true), Flink automatically divides the last chunk that is too large into small ones to make the job more stable.
    debezium.min.row.count.to.stream.results
    When the number of records in the table is greater than this value, the records will be read in batches.
    No
    It defaults to 1000. Flink reads data from a source with one of the following methods:
    Full read: The data in the whole table is read to the memory. This method allows quick read of the data, but the memory of the corresponding size will be consumed. If the source is extremely large, this poses the risk of OOM.
    Batch read: Certain rows of data is read each time until all the data is read. This method will not pose the risk of OOM when the source is large, but the reading is slow.
    debezium.snapshot.fetch.size
    Specifies the maximum number of rows that should be read from a MySQL source per time during the snapshot phase.
    No
    This option applies only for the batch read mode.
    debezium.skipped.operations
    Specifies the operations to be skipped, separated by comma. Operations include c (insert), u (update), and d (delete). By default, no operations will be skipped.
    No
    -
    scan.incremental.snapshot.enabled
    Specifies the incremental snapshot.
    No
    Default value: true.
    scan.incremental.snapshot.chunk.size
    The chunk size (number of rows) of table snapshot. Captured tables are split into multiple chunks when the table snapshot is read.
    No
    Default value: 8096.
    scan.lazy-calculate-splits.enabled
    Whether to enable lazy loading for the JobManager during the snapshot phase to avoid JobManager OOM because of too large data and too many chunks.
    No
    Default value: true.
    scan.newly-added-table.enabled
    Whether to enable dynamic loading.
    No
    Default value: false.
    scan.split-key.mode
    Specifies the primary key as the partitioning key.
    No
    Two valid values are available: default and specific. default indicates that the first field of the composite key is used as the partitioning key; specific requires setting scan.split-key.specific-column to specify a field in the composite key as the partitioning key.
    scan.split-key.specific-column
    Specifies a field in the composite key as the partitioning key.
    No
    This option is required when the value of scan.split-key.mode is specific. Its value will be the name of a field in the composite key.
    connect.timeout
    The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.
    No
    Default value: 30s.
    connect.max-retries
    Max times that the connector should retry to build a MySQL database server connection.
    No
    Default value: 3.
    connection.pool.size
    Connection pool size.
    No
    Default value: 20.
    jdbc.properties.*
    Option to pass custom JDBC URL parameters, such as 'jdbc.properties.useSSL' = 'false'.
    No
    Default value: 20.
    heartbeat.interval
    Interval of sending heartbeat event for tracing the latest available binlog offsets, generally used to address slow update of tables.
    No
    Default value: 20.
    debezium.*
    Debezium properties
    No
    Specifies Debezium properties for fine-grained control of the behaviors on the client, such as 'debezium.snapshot.mode' = 'never'. For details, see Debezium's MySQL Connector properties.

    Available metadata (to Flink v1.13 or later)

    Available metadata columns:
    Key
    Data Type
    Description
    database_name/meta.database_name
    STRING NOT NULL
    Name of the table that contains the row.
    table_name/meta.table_name
    STRING NOT NULL
    Name of the database that contains the row.
    op_ts/meta.op_ts
    TIMESTAMP_LTZ(3) NOT NULL
    When the change was made in the database.
    meta.batch_id
    BIGINT
    The batch ID of the binlog.
    meta.is_ddl
    BOOLEAN
    Whether the metadata consists of DDL statements.
    meta.mysql_type
    MAP
    Table structure.
    meta.update_before
    ARRAY
    Field value before it was modified.
    meta.pk_names
    ARRAY
    Name of the primary key field.
    meta.sql
    STRING
    Null
    meta.sql_type
    MAP
    Maps the fields in sql_type to the Java data type ID.
    meta.ts
    TIMESTAMP_LTZ(3) NOT NULL
    When the row was received and processed.
    meta.op_type
    STRING
    Operation type, such as INSERT or DELETE.
    meta.file
    STRING
    Null for the snapshot phase, and the name of the binlog file to which the data belongs for the incremental phase, such as mysql-bin.000101.
    meta.pos
    BIGINT
    0 for the snapshot phase, and the offset of the binlog file to which the data belongs for the incremental phase, such as 143127802.
    meta.gtid
    STRING
    Null for the snapshot phase, and the gtid of the data for the incremental read, such as 3d3c4464-c320-11e9-8b3a-6c92bf62891a:66486240.

    Example

    CREATE TABLE `mysql_cdc_source_table` (
    `id` INT,
    `name` STRING,
    `database_name` string METADATA FROM 'database_name',
    `table_name` string METADATA FROM 'table_name',
    `op_ts` timestamp(3) METADATA FROM 'op_ts',
    `op_type` string METADATA FROM 'meta.op_type',
    `batch_id` bigint METADATA FROM 'meta.batch_id',
    `is_ddl` boolean METADATA FROM 'meta.is_ddl',
    `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before',
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type',
    `pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names',
    `sql` STRING METADATA FROM 'meta.sql',
    `sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type',
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts',
    PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
    ) WITH (
    'connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.
    'hostname' = '192.168.10.22', -- IP of the MySQL database server.
    'port' = '3306', -- Integer port number of the MySQL database server.
    'username' = 'debezium', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
    'password' = 'hello@world!', -- Password to use when connecting to the MySQL database server.
    'database-name' = 'YourDatabase', -- Database name of the MySQL server to monitor.
    'table-name' = 'YourTable' -- Table name of the MySQL database to monitor.
    );

    Reading a MySQL database by sharding

    Stream Compute Service supports reading MySQL databases by sharding.
    If a MySQL database is already a shard-based database containing multiple tables such as A_1, A_2, A_3 …, and each table has the same schema, you can set the table-name option to a regex to match and read multiple tables. For example, you can set table-name to **A_.*** to monitor all tables prefixed with A_. database-name can also implement this feature.
    Note
    If database-name or table-name is set to a regex, the regex needs to be placed in ().

    Data type mapping

    MySQL CDC and Flink SQL data types are mapped as follows:
    MySQL Type
    Flink SQL Type
    Note
    TINYINT
    TINYINT
    -
    SMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILL
    SMALLINT
    -
    INT
    MEDIUMINT
    SMALLINT UNSIGNED
    SMALLINT UNSIGNED ZEROFILL
    INT
    -
    BIGINT
    INT UNSIGNED
    INT UNSIGNED ZEROFILL
    MEDIUMINT UNSIGNED
    MEDIUMINT UNSIGNED ZEROFILL
    BIGINT
    -
    BIGINT UNSIGNED
    BIGINT UNSIGNED ZEROFILL
    SERIAL
    DECIMAL(20, 0)
    -
    FLOAT
    FLOAT UNSIGNED
    FLOAT UNSIGNED ZEROFILL
    FLOAT
    -
    REAL
    REAL UNSIGNED
    REAL UNSIGNED ZEROFILL
    DOUBLE
    DOUBLE UNSIGNED
    DOUBLE UNSIGNED ZEROFILL
    DOUBLE PRECISION
    DOUBLE PRECISION UNSIGNED
    DOUBLE PRECISION UNSIGNED ZEROFILL
    DOUBLE
    -
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    NUMERIC(p, s) UNSIGNED ZEROFILL
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    DECIMAL(p, s) UNSIGNED ZEROFILL
    FIXED(p, s)
    FIXED(p, s) UNSIGNED
    FIXED(p, s) UNSIGNED ZEROFILL
    where p <= 38
    DECIMAL(p, s)
    -
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    NUMERIC(p, s) UNSIGNED ZEROFILL
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    DECIMAL(p, s) UNSIGNED ZEROFILL
    FIXED(p, s)
    FIXED(p, s) UNSIGNED
    FIXED(p, s) UNSIGNED ZEROFILL
    where 38 < p <= 65
    STRING
    The precision for DECIMAL data type is up to 65 in MySQL, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
    BOOLEAN
    TINYINT(1)
    BIT(1)
    BOOLEAN
    -
    DATE
    DATE
    -
    TIME [(p)]
    TIME [(p)]
    -
    TIMESTAMP [(p)]
    DATETIME [(p)]
    TIMESTAMP [(p)]
    -
    CHAR(n)
    CHAR(n)
    -
    VARCHAR(n)
    VARCHAR(n)
    -
    BIT(n)
    BINARY(⌈n/8⌉)
    -
    BINARY(n)
    BINARY(n)
    -
    VARBINARY(N)
    VARBINARY(N)
    -
    TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT
    STRING
    -
    TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB
    BYTES
    Currently, for BLOB data type in MySQL, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported.
    YEAR
    INT
    -
    ENUM
    STRING
    -
    JSON
    STRING
    The JSON data type will be converted into STRING with JSON format in Flink.
    SET
    ARRAY<STRING>
    As the SET data type in MySQL is a string object that can have zero or more values, it should always be mapped to an array of string.
    GEOMETRY
    POINT
    LINESTRING
    POLYGON
    MULTIPOINT
    MULTILINESTRING
    MULTIPOLYGON
    GEOMETRYCOLLECTION
    STRING
    The spatial data types in MySQL will be converted into STRING with a fixed JSON format.

    Example

    CREATE TABLE `mysql_cdc_source_table` (
    `id` INT,
    `name` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
    ) WITH (
    'connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.
    'hostname' = '192.168.10.22', -- IP of the MySQL database server.
    'port' = '3306', -- Integer port number of the MySQL database server.
    'username' = 'debezium', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
    'password' = 'hello@world!', -- Password to use when connecting to the MySQL database server.
    'database-name' = 'YourDatabase', -- Database name of the MySQL server to monitor.
    'table-name' = 'YourTable' -- Table name of the MySQL database to monitor.
    );
    
    CREATE TABLE `print_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    'connector' = 'print'
    );
    insert into print_table select * from mysql_cdc_source_table;

    Notes

    Checkpoints

    For MySQL CDC 1.0 ('scan.incremental.snapshot.enabled' = 'false'), you need to make additional configurations to use checkpoints. When MySQL CDC 1.0 reads snapshot data, it is unable to take a checkpoint. If many tables involving a large volume of data need to be synced, many times of checkpoint failure may occur, resulting in the job failure. You can set the allowed number of checkpoint failures (execution.checkpointing.tolerable-failed-checkpoints: 100) as instructed in Advanced Job Parameters.
    If you use MySQL CDC 2.0 and the default job parallelism is greater than 1, checkpointing must be enabled.
    After reading the snapshot data, the CDC connector will wait for the completion of a checkpoint before starting to read the incremental data.

    Risks of using MySQL CDC 1.0

    When a table has no primary key, you can only enable the CDC 1.0 mode using 'scan.incremental.snapshot.enabled' = 'false', which poses the following risks:
    1. The FLUSH TABLES WITH READ LOCK (FTWRL) statement will be executed by default.
    2. Although FTWRL only lasts for a short period of time, its mechanism may cause the database to be locked.
    3. FTWRL may also cause the following:
    Wait until an UPDATE/SELECT operation to complete.
    During this period, the database will be unavailable, blocking new SELECT queries. This is caused by the Query Cache mechanism in MySQL.
    If multiple MySQL CDC 1.0 sources run in parallel, the above situation probably will occur.

    User permissions

    The user of the source database must have the following permissions: SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD.
    GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    FLUSH PRIVILEGES;

    Global read lock

    In the "How MySQL CDC 1.x works" section above, the first step is to acquire a global read lock to get the schema and binlog position. This will block writes by other database clients and may affect the online business. If the At Least Once semantic is acceptable, set 'debezium.snapshot.locking.mode' to 'none' to skip this step.

    Setting a composite key

    The following DDL statements set index1, index2, index3, and index4 as indices of the composite key. They must be defined in the same way in PRIMARY KEY, and their order will not affect the sync.
    CREATE TABLE db_order_dim (
    `index1` STRING,
    `index2` STRING,
    `index3` STRING,
    `index4` STRING,
    `field5` STRING,
    `field6` STRING,
    PRIMARY KEY(`index1`, `index2`, `index3`, `index4`) NOT enforced
    ) WITH (
    ...
    );

    Defining a server-id

    We do not recommend you specify an explicit server-id, because the Stream Compute Service system automatically generates a random server-id in the range of 6400 - 2147483647 to avoid a server-id conflict that may occur when several jobs read the same database.
    If you indeed need to specify a server-id in MySQL CDC 2.x, we recommend you set it to a range, such as 5400-5405. Since each parallel reader must have a unique server ID, server-id must be a range like 5400-5405, and the range must be larger than the parallelism. For MySQL CDC 1.x, only a server-id value but not a range can be specified.
    Two methods are available for specifying server-id:
    1. Use the WITH parameters in the mysql-cdc DDL statements.
    2. Use SQL hints.
    SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;

    Setting MySQL session timeouts

    When an initial consistent snapshot is made for large databases, your established connection could time out while the tables are being read. You can prevent this behavior by configuring interactive_timeout and wait_timeout in your MySQL configuration file.
    interactive_timeout indicates the number of seconds the server waits for activity on an interactive connection before closing it. For details, see MySQL documentation.
    wait_timeout indicates the number of seconds the server waits for activity on a noninteractive connection before closing it. For details, see MySQL documentation.
    During the reading of incremental data, heartbeat may be invalid due to an extremely high load on the TaskManagers, and the server disconnects the connection (EOFException). In this case, you can run the following SQL statements in the MySQL server to increase the timeout:
    SET GLOBAL slave_net_timeout = 120;
    SET GLOBAL thread_pool_idle_timeout = 120;

    Critical logs of the JobManager

    For MySQL CDC 2.x, the sync of each table consists of the following stages: chunk splitting, snapshot reading, incremental correction, and incremental reading. Since more resources are occupied for a longer period at the first three stages, Stream Compute Service has made improvements in logs and metrics to help you gain insight into and analyze the running of your jobs.

    1. Splitting and assigning chunks

    Splitting start: Search by the into chunks keywords, such as Start splitting table cdc_basic_source.random_source_1 into chunks, or Start lazily splitting table cdc_basic_source.random_source_1 into chunks.
    Splitting end: Search by the chunks, time cost keywords, such as Split table cdc_basic_source.random_source_1 into 14 chunks, time cost: 994ms.

    2. Snapshot reading

    Assignment of chunks from snapshot: Enable the log of the DEBUG level, and search by the Current assigned splits for keywords to view the total number of chunks of each table and their assignment.
    Chunk assignment end: Search by the finished. Total split number keywords, such as Split assignment for cdc_basic_source.random_source_1 finished. Total split number: 14.
    Snapshot reading end: Search for the Assigner status changes from INITIAL_ASSIGNING to INITIAL_ASSIGNING_FINISHED log.

    3. Incremental correction

    Incremental correction start: Search for the Initial assigning finished as there are no more splits. Creating binlog split or Newly added assigning finished as there are no more splits. Waking up binlog reader log.

    4. Incremental reading

    Refer to the following section, "Critical logs of the TaskManagers".

    Critical logs of the TaskManagers

    Incremental reading start: Search by the has entered pure binlog phase keywords, such as Table cdc_basic_source.random_source_2 has entered pure binlog phase.
    Table schema change: Search by the Received schema change event keywords.
    EOFException: If a job restarts and the prompted exception is EOFException, adjust the timeout in the MySQL server as prompted. Alternatively, you can set a smaller total parallelism and increase the spec of each TaskManager to reduce the probability of timeout due to high memory and CPU pressure.

    Monitoring metrics

    Stream Compute Service adds many practical metrics in the MySQL CDC connector. You can go to the Flink UI of a job as instructed in Flink UI, and click the MySQL CDC source operator in the execution graph to search for and view these metrics.
    logpos: Get the current binlog position, which can help identify consumption blocks and other issues.
    numberOfInsertRecords: Get the number of output +I messages.
    numberOfDeleteRecords: Get the number of output -D messages.
    numberOfUpdateBeforeRecords: Get the number of output -U messages.
    numberOfUpdateAfterRecords: Get the number of output +U messages.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support