tencent cloud

Feedback

Last updated: 2023-11-08 16:06:23

    Overview

    The ‌JDBC connector allows for reading data from and writing data to MySQL, PostgreSQL, Oracle, and other common databases.
    The flink-connector-jdbc connector component provided by Stream Compute Service has built-in MySQL and PostgreSQL drivers. To connect Oracle and other databases, update the JDBC Driver packages as custom packages.
    Note
    You can use only the PostgreSQL connector to connect a Tencent Cloud House-P database.

    Versions

    Flink Version
    Description
    1.11
    Supported
    1.13
    Supported
    1.14
    Supported
    1.16
    Supported

    Use cases

    The JDBC connector can be used as a source for a table scanned by a fixed column or a joined right table (dimension table); it can also be used as a sink for a tuple or upsert (the primary key is required) table.
    To use change records of a JDBC database as stream tables for further consumption, use the built-in CDC source. If the built-in CDC source cannot meet your needs, you can capture and subscribe to the changes of the JBDC database using Debezium, Canal, or other available methods, and Stream Compute Service will further process the captured change events. For more information, see Kafka.

    Defining a table in DDL

    As a source (scanned by a fixed column)

    CREATE TABLE `jdbc_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options for database connection.
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.
    'table-name' = 'my-table', -- The name of the table to connect.
    'username' = 'admin', -- The username (with the INSERT permission required) for database access.
    'password' = 'MyPa$$w0rd', -- The password for database access.
    'scan.partition.column' = 'id', -- The name of the partitioned scan column.
    'scan.partition.num' = '2', -- The number of partitions.
    'scan.partition.lower-bound' = '0', -- The minimum value of the first partition.
    'scan.partition.upper-bound' = '100', -- The maximum value of the last partition.
    'scan.fetch-size' = '1' -- The number of rows that should be fetched from the database when reading per round trip.
    );

    As a source dimension table

    CREATE TABLE `jdbc_dim_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options for database connection.
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.
    'table-name' = 'my-table', -- The name of the table to connect.
    'username' = 'admin', -- The username (with the INSERT permission required) for database access.
    'password' = 'MyPa$$w0rd', -- The password for database access.
    'lookup.cache.max-rows' = '100', -- The maximum number of rows of lookup cache.
    'lookup.cache.ttl' = '5000' -- The max time to live for each row in the Lookup Cache.
    );

    As a sink (for tuple streams)

    CREATE TABLE `jdbc_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options for database connection.
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.
    'table-name' = 'my-table', -- The name of JDBC table to connect.
    'username' = 'admin', -- The username (with the INSERT permission required) for database access.
    'password' = 'MyPa$$w0rd', -- The password for database access.
    'sink.buffer-flush.max-rows' = '200', -- The maximum size of buffered records before flush.
    'sink.buffer-flush.interval' = '2s' -- The flush interval.
    );

    As a sink (for upsert streams)

    CREATE TABLE `jdbc_upsert_sink_table` (
    `id` INT PRIMARY KEY NOT ENFORCED,
    `name` STRING
    ) WITH (
    -- Specify the options for database connection.
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.
    'table-name' = 'my-upsert-table', -- The name of JDBC table to connect.
    'username' = 'admin', -- The username (with the INSERT permission required) for database access.
    'password' = 'MyPa$$w0rd', -- The password for database access.
    'sink.buffer-flush.max-rows' = '200', -- The maximum size of buffered records before flush.
    'sink.buffer-flush.interval' = '2s' -- The flush interval.
    );
    Note
    For an upsert stream, a primary key is required.

    WITH parameters

    Option
    Required
    Default Value
    Description
    connector
    Yes
    None
    For connection to a database, it should be 'jdbc'.
    url
    Yes
    None
    The JDBC database URL.
    table-name
    Yes
    None
    The table name.
    driver
    No
    None
    The class name of the JDBC driver to use to connect to the above-mentioned URL. If no value is set, it will automatically be derived from the URL.
    name-server
    No
    None
    The JDBC username. 'username' and 'password' must both be specified if any of them is specified.
    password
    No
    None
    The JDBC password.
    scan.partition.column
    No
    None
    The column name used for partitioning the input. The column type must be numeric, date, or timestamp. For details, see Partitioned scan.
    scan.partition.num
    No
    None
    The number of partitions.
    scan.partition.lower-bound
    No
    None
    The smallest value of the first partition.
    scan.partition.upper-bound
    No
    None
    The largest value of the last partition.
    scan.fetch-size
    No
    0
    The number of rows that should be fetched from the database when reading per round trip. If the value specified is 0, the data is read row by row, indicating low efficiency (low throughput).
    lookup.cache.max-rows
    No
    None
    The maximum number of data records cached in the Lookup Cache.
    lookup.cache.ttl
    No
    None
    The maximum cache time of each data record in the Lookup Cache.
    lookup.max-retries
    No
    3
    The maximum number of retries upon database lookup failure.
    sink.buffer-flush.max-rows
    No
    100
    The maximum size of buffered records before flush. If this is set to 0, the records will not be buffered.
    sink.buffer-flush.interval
    No
    1s
    The maximum interval (ms) between flushes. If sink.buffer-flush.max-rows is 0, and this option is not, buffered actions will be processed asynchronously.
    sink.max-retries
    No
    3
    The maximum retry times if writing records to database failed.
    sink.ignore-delete
    No
    false
    Whether to ignore the DELETE operation.

    Example

    CREATE TABLE `jdbc_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options for database connection.
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.
    'table-name' = 'my-table', -- The name of the table to connect.
    'username' = 'admin', -- The username (with the INSERT permission required) for database access.
    'password' = 'MyPa$$w0rd', -- The password for database access.
    'scan.partition.column' = 'id', -- The name of the partitioned scan column.
    'scan.partition.num' = '2', -- The number of partitions.
    'scan.partition.lower-bound' = '0', -- The minimum value of the first partition.
    'scan.partition.upper-bound' = '100', -- The maximum value of the last partition.
    'scan.fetch-size' = '1' -- The number of rows that should be fetched from the database when reading per round trip.
    );
    CREATE TABLE `jdbc_upsert_sink_table` (
    `id` INT PRIMARY KEY NOT ENFORCED,
    `name` STRING
    ) WITH (
    -- Specify the options for database connection.
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.
    'table-name' = 'my-upsert-table', -- The name of JDBC table to connect.
    'username' = 'admin', -- The username (with the INSERT permission required) for database access.
    'password' = 'MyPa$$w0rd', -- The password for database access.
    'sink.buffer-flush.max-rows' = '200', -- The maximum size of buffered records before flush.
    'sink.buffer-flush.interval' = '2s' -- The flush interval.
    );
    insert into jdbc_upsert_sink_table select * from jdbc_source_table;

    Primary key

    For append (tuple) data, it is not required to set the primary key option in the DDL statements of the job, nor define a primary key for the table of the JDBC database. We do not recommend defining a primary key (because duplicate data may cause write to fail).
    For upsert data, a primary key must be defined for the table of the JDBC database, and the PRIMARY KEY NOT ENFORCED constraint shall also be set for the corresponding column in the CREATE TABLE clause of the DDL statements.
    Note
    For MySQL tables, ‌the implementation of the upsert operation relies on the INSERT .. ON DUPLICATE KEY UPDATE .. syntax, which is supported by all common MySQL versions.
    For MySQL tables, ‌the implementation of the upsert operation relies on the INSERT .. ON CONFLICT .. DO UPDATE SET .. syntax, which is supported by PostgreSQL v9.5 or later.
    For Oracle tables, ‌the implementation of the upsert operation relies on the MERGE .. INTO .. USING ON .. WHEN UPDATE .. WHEN INSERT .. syntax, which is supported by Oracle v9i or later.

    Partitioned scan

    Partitioned scan can accelerate reading data in parallel source operators (tasks) from a JDBC table. Each task reads the partitions assigned to it only. To use this feature, all the four options starting with scan.partition must all be specified. Otherwise, errors will be raised.
    Note
    The maximum value specified by scan.partition.upper-bound and the minimum value specified by scan.partition.lower-bound refer to the maximum and minimum partition steps. They do not affect the number and accuracy of the read data records.

    Lookup Cache

    Flink provides the Lookup Cache feature to improve the performance of lookup in a dimension table. The Lookup Cache is implemented synchronously. It is not enabled by default, which means the data is read from the database for each request and the throughput is very low. To enable it, you must set lookup.cache.max-rows and lookup.cache.ttl.
    Note
    If the lookup cache TTL is too long, or the number of the data records cached is too large, the Flink job may still read the old data in the cache after the data in the database is updated. Therefore, for a job involving a frequently changed database, please use this feature with caution.

    Flush optimization

    Setting the first two sink.buffer-flush options allows flush in batches to the data. We recommend you also set relevant underlying database parameters for better flush performance. Otherwise, the data may also be flushed record by record in the underlying system, lowering the efficiency.
    For MySQL, it is recommended to add rewriteBatchedStatements=true following the URL option as shown below.
    jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true
    For PostgreSQL, it is recommended to add reWriteBatchedInserts=true following the URL option as shown below.
    jdbc:postgresql://10.1.28.93:3306/PG?reWriteBatchedInserts=true&currentSchema=Database schema

    Descriptions of monitoring metrics

    Stream Compute Service adds many practical metrics in the JDBC connector. Click the JDBC sink operator in the execution graph in Flink UI a‌nd search for one of the metrics:
    numberOfInsertRecords: The number of output +I messages.
    numberOfDeleteRecords: The number of output -D messages.
    numberOfUpdateBeforeRecords: The number of output -U messages.
    numberOfUpdateAfterRecords: The number of output +U messages.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support