tencent cloud

Feedback

SQLServer CDC

Last updated: 2023-11-08 14:23:39

    Overview

    The SQLServer CDC source connector allows for reading snapshot data and incremental data from SQLServer database. This document describes how to set up the SQLServer CDC connector.

    Versions

    Flink Version
    Description
    1.11
    Unsupported
    1.13
    Supported
    1.14
    Unsupported
    1.16
    Unsupported

    Limits

    The SQLServer CDC connector can be used only as a source.

    Setting up a SQLServer database

    Change data capture (CDC) must be enabled on the ‌SQLServer source tables to capture data changes.
    1. Enable CDC on a database as instructed in Setting Change Data Capture (CDC).
    2. Enable CDC on a SQLServer source table as instructed in sys.sp_cdc_enable_table.
    USE MyDB
    GO
    
    EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo', -- Specifies the schema of the source table.
    @source_name = N'MyTable', -- Specifies the name of the table that you want to capture.
    @role_name = NULL, -- Specifies a role MyRole to which you can add users to whom you want to grant SELECT permission on the captured columns of the source table. Users in the `sysadmin` or `db_owner` role also have access to the specified change tables. Set the value of `@role_name` to `NULL` to allow only members in the `sysadmin` or `db_owner` role to have full access to captured information.
    @filegroup_name = NULL, -- Specifies the filegroup where SQL Server places the change table for the captured table. It can be set to `NULL`. If it is set to a specific name, `filegroup_name` must be defined for the current database. If it is set to `NULL`, the default filegroup is used.
    @supports_net_changes = 0 -- Whether to support querying net changes for the captured table. If the table has a primary key or a unique index using the parameter ID `@index_name`, `supports_net_changes` defaults to `1`. Otherwise, it defaults to `0`. If it is `0`, only the function for querying all changes is generated. If it is `1`, the function for querying net changes is also generated. Setting `supports_net_changes` to `1` requires specifying `index_name` or a defined primary key of the source table.
    GO
    3. Check whether CDC is enabled on the source table as instructed in sys.sp_cdc_help_change_data_capture.
    USE MyDB
    GO
    
    EXEC sys.sp_cdc_help_change_data_capture
    GO

    Defining a table in DDL

    -- register a SqlServer table 'orders' in Flink SQL
    CREATE TABLE orders (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = 'localhost',
    'port' = '1433',
    'username' = 'sa',
    'password' = 'Password!',
    'database-name' = 'inventory',
    'schema-name' = 'dbo',
    'table-name' = 'orders'
    );
    
    -- read snapshot and binlogs from orders table
    SELECT * FROM orders;

    WITH parameters

    Option
    Required
    Default Value
    Type
    Description
    connector
    Yes
    None
    String
    The value must be sqlserver-cd.
    hostname
    Yes
    None
    String
    IP address or hostname of the SQLServer database.
    username
    Yes
    None
    String
    Username to use when connecting to the SQLServer database.
    password
    Yes
    None
    String
    Password to use when connecting to the SQLServer database.
    database-name
    Yes
    None
    String
    Database name of the SQLServer database to monitor.
    schema-name
    Yes
    None
    String
    Schema name of the source table. Regexes in Java are supported. For example, dbo.* can match dbo, dbo1, and dbo_test. We recommend you place a regex in brackets to prevent errors when it is combined with table-name.
    table-name
    Yes
    None
    String
    Table name of the SQLServer database to monitor. Regexes in Java are supported. We recommend you place a regex in brackets to prevent errors when it is combined with schema-name.
    port
    No
    1433
    Integer
    Integer port number of the SQLServer database.
    server-time-zone
    No
    UTC
    String
    The session time zone in database server, such as "Asia/Shanghai".
    debezium.*
    No
    None
    String
    Specifies Debezium properties for fine-grained control of the behaviors on the client, such as 'debezium.snapshot.mode' = 'initial_only'. For details, see Debezium's SQLServer Connector properties.

    Available metadata

    Key
    Data Type
    Description
    table_name
    STRING NOT NULL
    Name of the database that contains the row.
    schema_name
    STRING NOT NULL
    Name of the schema that contains the row.
    database_name
    STRING NOT NULL
    Name of the table that contains the row.
    op_ts
    TIMESTAMP_LTZ(3) NOT NULL
    ‌It indicates the time that the change was made in the database. For the data during the snapshot phase, the value of this option is 0.
    Example:
    CREATE TABLE products (
    table_name STRING METADATA FROM 'table_name' VIRTUAL,
    schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    id INT NOT NULL,
    name STRING,
    description STRING,
    weight DECIMAL(10,3)
    ) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = 'localhost',
    'port' = '1433',
    'username' = 'sa',
    'password' = 'Password!',
    'database-name' = 'inventory',
    'schema-name' = 'dbo',
    'table-name' = 'products'
    );

    Data type mapping

    SQLServer Type
    Flink SQL Type
    char(n)
    CHAR(n)
    varchar(n) nvarchar(n) nchar(n)
    VARCHAR(n)
    text ntext xml
    STRING
    decimal(p, s) money smallmoney
    DECIMAL(p, s)
    numeric
    NUMERIC
    float real
    DOUBLE
    bit
    BOOLEAN
    int
    INT
    tinyint
    SMALLINT
    smallint
    SMALLINT
    bigint
    BIGINT
    date
    DATE
    time(n)
    TIME(n)
    datetime2 datetime smalldatetime
    TIMESTAMP(n)
    datetimeoffset
    TIMESTAMP_LTZ(3)

    Notes

    Can't perform checkpoint during scanning snapshot of tables

    During scanning snapshot of database tables, since there is no recoverable position, we can’t perform checkpoints. In order not to perform checkpoints, SQLServer CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as a failed checkpoint, and by default, this will trigger a failover for the Flink job. So, if the database table is large, it is recommended to add the following Flink configurations to avoid failover because of the timeout checkpoints:
    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    Single thread reading

    The SQLServer CDC source can't work in parallel reading, because there is only one task that can receive change events.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support