tencent cloud

Feedback

PostgreSQL CDC

Last updated: 2023-11-08 14:51:44

    Overview

    The Postgres CDC source connector allows for reading snapshot data and incremental data from PostgreSQL databases. It can read the data with exactly-once processing even failures occur.

    Versions

    Flink Version
    Description
    1.11
    Supported
    1.13
    Supported
    1.14
    Unsupported
    1.16
    Supported

    Limits

    The Postgres CDC connector can be used only as a source. It supports PostgreSQL v9.6 or later.

    Defining a table in DDL

    CREATE TABLE postgres_cdc_source_table (
    id INT,
    name STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to be synced has it defined.
    ) WITH (
    'connector' = 'postgres-cdc', -- Here, it should be'postgres-cdc'.
    'hostname' = 'yourHostname', -- IP of the database server.
    'port' = '5432', -- Integer port number of the database server.
    'username' = 'yourUserName', Name of the PostgreSQL database to use when connecting to the PostgreSQL database server (REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT permissions are required).
    'password' = 'psw' -- Password to use when connecting to the PostgreSQL database server.
    'database-name' = 'yourDatabaseName', -- Database name of the PostgreSQL server to monitor.
    'schema-name' = 'yourSchemaName', -- Schema name of the PostgreSQL database to monitor (a regex is supported).
    'table-name' = 'yourTableName', -- Table name of the PostgreSQL database to monitor (a regex is supported).
    'debezium.slot.name' = 'customslotname' -- Define a unique slot name, which supports lowercase letters, digits, and underscores.
    );

    WITH parameters

    Option
    Description
    Required
    Remarks
    connector
    The connector to use.
    Yes
    Here, it should bepostgres-cdc.
    hostname
    IP address or hostname of the ‌PostgreSQL database server.
    Yes
    -
    username
    Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.
    Yes
    The user must have specific permissions (REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT).
    password
    Password to use when connecting to the PostgreSQL database server.
    Yes
    -
    database-name
    Database name of the PostgreSQL server to monitor.
    Yes
    -
    schema-name
    Schema name of the PostgreSQL database to monitor.
    Yes
    The schema-name supports regular expressions to monitor multiple schemas matching the regular expression.
    table-name
    Table name of the PostgreSQL database to monitor.
    Yes
    The table-name supports regular expressions to monitor multiple tables matching the regular expression.
    port
    Integer port number of the PostgreSQL database server.
    No
    Default value: 5432.
    decoding.plugin.name
    The name of the Postgres logical decoding plugin.
    No
    It depends on the Postgres logical decoding plugin installed on the server. Supported values are as follows:
    decoderbufs (default)
    wal2json
    wal2json_rds
    wal2json_streaming
    wal2json_rds_streaming
    pgoutput
    debezium.*
    Debezium properties.
    No
    Specifies Debezium properties for fine-grained control of the behaviors on the client, such as 'debezium.slot.name' = 'xxxx', to prevent PSQLException: ERROR: replication slot "dl_test" is active for PID 19997. For details, see Connector configuration properties.

    Data type mapping

    Postgres CDC and Flink data types are mapped as follows:
    Postgres CDC Type
    Flink Type
    SMALLINT
    SMALLINT
    INT2
    SMALLSERIAL
    SERIAL2
    INTEGER
    INT
    SERIAL
    BIGINT
    BIGINT
    BIGSERIAL
    REAL
    FLOAT
    FLOAT4
    FLOAT8
    DOUBLE
    DOUBLE PRECISION
    NUMERIC(p, s)
    DECIMAL(p, s)
    DECIMAL(p, s)
    BOOLEAN
    BOOLEAN
    DATE
    DATE
    TIME [(p)] [WITHOUT TIMEZONE]
    TIME [(p)] [WITHOUT TIMEZONE]
    TIMESTAMP [(p)] [WITHOUT TIMEZONE]
    TIMESTAMP [(p)] [WITHOUT TIMEZONE]
    CHAR(n)
    STRING
    CHARACTER(n)
    VARCHAR(n)
    CHARACTER VARYING(n)
    TEXT
    BYTEA
    BYTES

    Example

    CREATE TABLE postgres_cdc_source_table (
    id INT,
    name STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to be synced has it defined.
    ) WITH (
    'connector' = 'postgres-cdc', -- Here, it should be'postgres-cdc'.
    'hostname' = 'yourHostname', -- IP of the database server.
    'port' = '5432', -- Integer port number of the database server.
    'username' = 'yourUserName', Name of the PostgreSQL database to use when connecting to the PostgreSQL database server (REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT permissions are required).
    'password' = 'psw' -- Password to use when connecting to the PostgreSQL database server.
    'database-name' = 'yourDatabaseName', -- Database name of the PostgreSQL server to monitor.
    'schema-name' = 'yourSchemaName', -- Schema name of the PostgreSQL database to monitor (a regex is supported).
    'table-name' = 'yourTableName', -- Table name of the PostgreSQL database to monitor (a regex is supported).
    'debezium.slot.name' = 'customslotname' -- Define a unique slot name, which supports lowercase letters, digits, and underscores.
    );
    
    CREATE TABLE `print_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    'connector' = 'print'
    );
    insert into print_table select * from postgres_cdc_source_table;

    Notes

    User permissions

    The user must at least have the following permissions: REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT.
    CREATE ROLE debezium_user REPLICATION LOGIN;
    GRANT USAGE ON SCHEMA schema_name TO debezium_user;
    GRANT USAGE ON DATABASE schema_name TO debezium_user;
    GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support