flink-connector-oceanbase-jdbc Connector component currently provided by Stream Compute Service (SCS) supports MySQL mode and Oracle mode in Oceanbase.Flink Version | Description |
1.11 | Not supported |
1.13 | Supported |
1.14 | Supported |
1.16 | Supported |
CREATE TABLEjdbc_source_table(idINT,nameSTRING) WITH (-- Specify database connection parameters.'connector' = 'jdbc-oceabase','compatible-mode'= 'mysql','url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.'table-name' = 'my-table', -- Data table to write data to.'username' = 'admin', -- Username for database access (INSERT permission is required).password' = 'MyPa$$w0rd', -- Password for database access.'scan.partition.column = 'id', -- Column name for partitioned scan.'scan.partition.num' = '2', -- Number of partitions.'scan.partition.lower-bound' = '0', -- The lower bound of the first partition.'scan.partition.upper-bound' = '100', -- The upper bound of the last partition.'scan.fetch-size' = '1' -- Number of rows obtained in batches on each database read.);
CREATE TABLEjdbc_dim_table(idINT,nameSTRING) WITH (-- Specify database connection parameters.'connector' = 'jdbc-oceanbase','compatible-mode'= 'mysql','url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.'table-name' = 'my-table', -- The data table to write data to.'username' = 'admin', -- Username for database access (INSERT permission is required).'password' = 'MyPa$$w0rd', -- Password for database access.'lookup.cache.max-rows' = '100', -- Size of the read cache.'lookup.cache.ttl' = '5000' -- TTL for the read cache.);
CREATE TABLEjdbc_sink_table(idINT,nameSTRING) WITH (#NAME?'connector' = 'jdbc-oceanbase','compatible-mode'= 'mysql','url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.'table-name' = 'my-table', -- The data table to write data to.'username' = 'admin', -- Username for database access (INSERT permission is required).'password' = 'MyPa$$w0rd', -- Password for database access.'sink.buffer-flush.max-rows' = '200', -- Number of rows output in batches.'sink.buffer-flush.interval' = '2s' -- Batch output interval.);
CREATE TABLEjdbc_upsert_sink_table(idINT PRIMARY KEY NOT ENFORCED,nameSTRING) WITH (-- Specify database connection parameters.'connector' = 'jdbc-oceanbase','compatible-mode'= 'mysql','url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.'table-name' = 'my-upsert-table', -- The data table to write data to.'username' = 'admin', -- Username for database access (INSERT permission is required).'password' = 'MyPa$$w0rd', -- Password for database access.'sink.buffer-flush.max-rows' = '200', -- Number of rows in batch output.'sink.buffer-flush.interval' = '2s' -- Batch output interval.);
Parameter Value | Required | Default Value | Description |
connector | Yes | None | When connecting to the database, enter 'jdbc-oceanbase'. |
compatible-mode | Yes | None | When connecting to the database, fill in MySQL or Oracle according to the actual situation. |
url | Yes | None | JDBC database connection URL. |
table-name | Yes | None | Database table name. |
driver | No | None | The class name of the JDBC driver. If not entered, automatically use the drive class name for each mode by default. |
username | No | None | Database username. 'username' and 'password' must be used together. |
password | No | None | Database password. |
scan.partition.column | No | None | Specify the column name for partitioned scan of the input. The column must be of numerical type, date type, or timestamp type. |
scan.partition.num | No | None | When the partitioned scan is enabled, specify the number of partitions. |
scan.partition.lower-bound | No | None | When the partitioned scan is enabled, specify the lower bound of the first partition. |
scan.partition.upper-bound | No | None | When the partitioned scan is enabled, specify the upper bound of the last partition. |
scan.fetch-size | No | 0 | The number of rows obtained from the database on each loop read. If the specified value is '0', this configuration item will be ignored. |
lookup.cache.max-rows | No | None | The maximum number of rows in the lookup cache. When the value is exceeded, the oldest row records will expire. The lookup cache is not enabled by default. |
lookup.cache.ttl | No | None | The maximum lifetime for each row record in the lookup cache. When this time limit is exceeded, the oldest row records will expire. The lookup cache is not enabled by default. |
lookup.max-retries | No | 3 | The maximum number of retries when a database query fails. |
sink.buffer-flush.max-rows | No | 100 | The maximum value of cached records before flush. You can set it to '0' to disable it. |
sink.buffer-flush.interval | No | 1s | The maximum interval (ms) for each batch during batch output. If 'sink.buffer-flush.max-rows' is set to '0', but this option is not set to 0, this indicates that the asynchronous output feature is enabled, that is, the two threads "outputting data to operators" and "writing these data from operators to the database finally" are completely decoupled. |
sink.max-retries | No | 3 | The maximum number of retries when writing to the database fails. |
CREATE TABLEoceanbase_source_table(idINT,nameSTRING) WITH (#NAME?'connector' = 'jdbc-oceanbase','url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.'table-name' = 'my-table', -- The data table to write data to.'username' = 'admin', -- Username for database access (INSERT permission is required).'password' = 'MyPa$$w0rd', -- Password for database access.'scan.partition.column = 'id', -- Column name for partitioned scan.'scan.partition.num' = '2', -- Number of partitions.'scan.partition.lower-bound' = '0', -- The lower bound of the first partition.'scan.partition.upper-bound' = '100', -- The upper bound of the last partition.'scan.fetch-size' = '1' -- Number of rows to obtain in batch on each database read.);CREATE TABLEoceanbase_upsert_sink_table(idINT PRIMARY KEY NOT ENFORCED,nameSTRING) WITH (-- Specify database connection parameters.'connector' = 'jdbc-oceanbase','url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.'table-name' = 'my-upsert-table', -- The data table to write data to.'username' = 'admin', -- Username for database access (INSERT permission is required).'password' = 'MyPa$$w0rd', -- Password for database access.'sink.buffer-flush.max-rows' = '200', -- Number of rows in batch output.'sink.buffer-flush.interval' = '2s' -- Batch output interval.);insert into oceanbase_upsert_sink_table select * from oceanbase_source_table;
PRIMARY KEY NOT ENFORCED constraint in the corresponding column name in the CREATE TABLE DDL statement.scan.partition must be specified. Otherwise, an error will be reported.scan.partition.upper-bound and the lower bound specified with scan.partition.lower-bound refers to the step size of each partition, which will not affect the final rows of records read and data accuracy.lookup.cache.max-rows and lookup.cache.ttl manually to enable this feature.jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true
피드백