tencent cloud

Feedback

JDBC Connector for Tencent Cloud House-P

Last updated: 2023-11-08 14:27:20

    Overview

    The JDBC-PG connector allows for data writing to a Tencent Cloud House-P database.

    Versions

    Flink Version
    Description
    1.11
    Unsupported
    1.13
    Supported
    1.14
    ‌Unsupported
    1.16
    Supported

    Use cases

    ‌The JDBC-PG connector can be used as a sink for tuple and upsert (a primary key is required) streams.

    Defining a table in DDL

    As a sink (for tuple streams)

    CREATE TABLE `jdbc_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options for database connection.
    'connector' = 'jdbcPG',
    'url' = 'jdbc:postgresql://10.1.28.93:3306/CDB?reWriteBatchedInserts=true&serverTimezone=Asia/Shanghai', -- Replace it with your PostreSQL database URL.
    'table-name' = 'my-table', -- The name of JDBC table to connect.
    'username' = 'admin', -- The username (with the INSERT permission required) for database access.
    'password' = 'MyPa$$w0rd', -- The password for database access.
    'sink.buffer-flush.max-rows' = '200', -- The max size of buffered records before flush.
    'sink.buffer-flush.interval' = '2s' -- The flush interval.
    );

    As a sink (for upsert streams)

    CREATE TABLE `jdbc_upsert_sink_table` (
    `id` INT PRIMARY KEY NOT ENFORCED,
    `name` STRING
    ) WITH (
    -- Specify the options for database connection.
    'connector' = 'jdbcPG',
    'url' = 'jdbc:postgresql://10.1.28.93:3306/CDB?reWriteBatchedInserts=true&serverTimezone=Asia/Shanghai', -- Replace it with your PostreSQL connection URL.
    'table-name' = 'my-upsert-table', -- The name of JDBC table to connect.
    'username' = 'admin', -- The username (with the INSERT permission required) for database access.
    'password' = 'MyPa$$w0rd', -- The password for database access.
    'sink.buffer-flush.max-rows' = '200', -- The max size of buffered records before flush.
    'sink.buffer-flush.interval' = '2s' -- The flush interval.
    );
    Note
    For an upsert stream, a primary key is required.

    WITH parameters

    Option
    Required
    Default Value
    Description
    connector
    Yes
    None
    For connection to a database, it should be 'jdbcPG'.
    url
    Yes
    None
    The JDBC database URL.
    table-name
    Yes
    None
    The name of JDBC table to connect.
    driver
    No
    None
    The class name of the JDBC driver to use to connect to the above-mentioned URL. If no value is set, it will automatically be derived from the URL.
    username
    No
    None
    The JDBC username. 'username' and 'password' must both be specified if any of them is specified.
    password
    No
    None
    The JDBC password.
    scan.partition.column
    No
    None
    The column name used for partitioning the input. The column type must be numeric, date, or timestamp. For details, see Partitioned scan.
    scan.partition.num
    No
    None
    The number of partitions.
    scan.partition.lower-bound
    No
    None
    The smallest value of the first partition.
    scan.partition.upper-bound
    No
    None
    The largest value of the last partition.
    scan.fetch-size
    No
    0
    The number of rows that should be fetched from the database when reading per round trip. If the value specified is 0, the data is read row by row, indicating low efficiency (low throughput).
    lookup.cache.max-rows
    No
    None
    The maximum number of data records cached in the Lookup Cache.
    lookup.cache.ttl
    No
    None
    The maximum cache time of each data record in the Lookup Cache.
    lookup.max-retries
    No
    3
    The maximum number of retries upon database lookup failure.
    sink.buffer-flush.max-rows
    No
    100
    The maximum size of buffered records before flush. If this is set to 0, the records will not be buffered.
    sink.buffer-flush.interval
    No
    1s
    The maximum interval (ms) between flushes. If sink.buffer-flush.max-rows is 0, and this parameter is not, buffered actions will be processed asynchronously.
    sink.max-retries
    No
    3
    The maximum retry times if writing records to database failed.
    write-mode
    No
    insert
    Write in the copy mode. Valid values: insert and copy.
    copy-delimiter
    No
    &#x399
    The field delimiter in the copy mode.

    Sample

    CREATE TABLE jdbc_upsert_sink_table (
    id INT ,
    age INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
    ) WITH (
    -- Specify the options for database connection.
    'connector' = 'jdbcPG',
    'url' = 'jdbc:postgresql://10.0.0.2:5436/postgres',
    'table-name' = 'my-upsert-table', -- The name of JDBC table to connect.
    'username' = 'admin', -- The username (with the INSERT permission required) for database access.
    'password' = 'password', -- The password for database access.
    'sink.buffer-flush.max-rows' = '300', -- The max size of buffered records before flush.
    'sink.buffer-flush.interval' = '100s' -- The flush interval.
    );
    
    
    -- MySQL CDC connector as the source. Use "flink-connector-mysql-cdc" and this connector together. The MySQL version must be v5.7 or later.
    For more information, see: https://www.tencentcloud.com/document/product/849/52698?from_cn_redirect=1.
    CREATE TABLE mysql_cdc_source_table (
    id INT,
    age INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
    ) WITH (
    'connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.
    'hostname' = '10.0.0.6', -- IP of the MySQL database server.
    'port' = '3360', -- Integer port number of the MySQL database server.
    'username' = 'root', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
    'password' = 'password', -- Password to use when connecting to the MySQL database server.
    -- 'scan.incremental.snapshot.enabled' = 'false' -- This option is required if the source table has no primary key.
    'database-name' = 'database', -- Database name of the MySQL server to monitor.
    'table-name' = 'table' -- Table name of the MySQL database to monitor.
    );
    
    
    INSERT INTO jdbc_upsert_sink_table select * from mysql_cdc_source_table;

    Primary key

    For append (tuple) data, it is not required to set the primary key option in the DDL statements of the job, nor define a primary key for the table of the JDBC database. We indeed do not recommend defining a primary key (because duplicate data may cause write to fail).
    For upsert data, a primary key must be defined for the table of the JDBC database, and the PRIMARY KEY NOT ENFORCED constraint shall also be set for the corresponding column in the CREATE TABLE clause of the DDL statements.
    Note
    The implementation of the upsert operation does not rely on the INSERT .. ON CONFLICT .. DO UPDATE SET .. syntax, but on upsert +delete in batches. This syntax applies to PostgreSQL v9.5 or earlier.

    Flush optimization

    Setting the first two sink.buffer-flush options allows flush in batches to the data. We recommend you also set relevant underlying database parameters for better flush performance. Otherwise, the data may also be flushed record by record in the underlying system, lowering the efficiency.
    For PostgreSQL, it is recommended to add reWriteBatchedInserts=true following the URL option as shown below.
    jdbc:postgresql://10.1.28.93:3306/PG?reWriteBatchedInserts=true&currentSchema=Database schema
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support