tencent cloud

Stream Compute Service

Oceanbase

다운로드
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-06-04 10:43:08

Introduction

Oceanbase Connector provides read and write support for the Oceanbase database.
The flink-connector-oceanbase-jdbc Connector component currently provided by Stream Compute Service (SCS) supports MySQL mode and Oracle mode in Oceanbase.

Version Description

Flink Version
Description
1.11
Not supported
1.13
Supported
1.14
Supported
1.16
Supported

Applicable Scope

jdbc-oceanbase can be used as data source tables (Source) for column scan and as right tables (dimension tables) in JOIN operations. jdbc-oceanbase can also be used as data sink tables (Sink) for Tuple data flow tables and Upsert data flow tables (requires specifying primary keys).

DDL Definition

Serving as a Data Source for Column Scan (Source)

CREATE TABLE jdbc_source_table (
id INT,
name STRING
) WITH (
-- Specify database connection parameters.
'connector' = 'jdbc-oceabase',
'compatible-mode'= 'mysql',
'url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.
'table-name' = 'my-table', -- Data table to write data to.
'username' = 'admin', -- Username for database access (INSERT permission is required).
password' = 'MyPa$$w0rd', -- Password for database access.
'scan.partition.column = 'id', -- Column name for partitioned scan.
'scan.partition.num' = '2', -- Number of partitions.
'scan.partition.lower-bound' = '0', -- The lower bound of the first partition.
'scan.partition.upper-bound' = '100', -- The upper bound of the last partition.
'scan.fetch-size' = '1' -- Number of rows obtained in batches on each database read.
);

Serving as a Data Source for Dimension Tables (Source)

CREATE TABLE jdbc_dim_table (
id INT,
name STRING
) WITH (
-- Specify database connection parameters.
'connector' = 'jdbc-oceanbase',
'compatible-mode'= 'mysql',
'url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.
'table-name' = 'my-table', -- The data table to write data to.
'username' = 'admin', -- Username for database access (INSERT permission is required).
'password' = 'MyPa$$w0rd', -- Password for database access.
'lookup.cache.max-rows' = '100', -- Size of the read cache.
'lookup.cache.ttl' = '5000' -- TTL for the read cache.
);

Serving as a Data Sink (Tuple Sink)

CREATE TABLE jdbc_sink_table (
id INT,
name STRING
) WITH (
#NAME?
'connector' = 'jdbc-oceanbase',
'compatible-mode'= 'mysql',
'url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.
'table-name' = 'my-table', -- The data table to write data to.
'username' = 'admin', -- Username for database access (INSERT permission is required).
'password' = 'MyPa$$w0rd', -- Password for database access.
'sink.buffer-flush.max-rows' = '200', -- Number of rows output in batches.
'sink.buffer-flush.interval' = '2s' -- Batch output interval.
);

Serving as a Data Sink (Upsert Sink)

CREATE TABLE jdbc_upsert_sink_table (
id INT PRIMARY KEY NOT ENFORCED,
name STRING
) WITH (
-- Specify database connection parameters.
'connector' = 'jdbc-oceanbase',
'compatible-mode'= 'mysql',
'url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.
'table-name' = 'my-upsert-table', -- The data table to write data to.
'username' = 'admin', -- Username for database access (INSERT permission is required).
'password' = 'MyPa$$w0rd', -- Password for database access.
'sink.buffer-flush.max-rows' = '200', -- Number of rows in batch output.
'sink.buffer-flush.interval' = '2s' -- Batch output interval.
);
Note:
You need to define a primary key for Upsert.

WITH Parameters

Parameter Value
Required
Default Value
Description
connector
Yes
None
When connecting to the database, enter 'jdbc-oceanbase'.
compatible-mode
Yes
None
When connecting to the database, fill in MySQL or Oracle according to the actual situation.
url
Yes
None
JDBC database connection URL.
table-name
Yes
None
Database table name.
driver
No
None
The class name of the JDBC driver. If not entered, automatically use the drive class name for each mode by default.
username
No
None
Database username. 'username' and 'password' must be used together.
password
No
None
Database password.
scan.partition.column
No
None
Specify the column name for partitioned scan of the input. The column must be of numerical type, date type, or timestamp type.
scan.partition.num
No
None
When the partitioned scan is enabled, specify the number of partitions.
scan.partition.lower-bound
No
None
When the partitioned scan is enabled, specify the lower bound of the first partition.
scan.partition.upper-bound
No
None
When the partitioned scan is enabled, specify the upper bound of the last partition.
scan.fetch-size
No
0
The number of rows obtained from the database on each loop read. If the specified value is '0', this configuration item will be ignored.
lookup.cache.max-rows
No
None
The maximum number of rows in the lookup cache. When the value is exceeded, the oldest row records will expire. The lookup cache is not enabled by default.
lookup.cache.ttl
No
None
The maximum lifetime for each row record in the lookup cache. When this time limit is exceeded, the oldest row records will expire. The lookup cache is not enabled by default.
lookup.max-retries
No
3
The maximum number of retries when a database query fails.
sink.buffer-flush.max-rows
No
100
The maximum value of cached records before flush. You can set it to '0' to disable it.
sink.buffer-flush.interval
No
1s
The maximum interval (ms) for each batch during batch output. If 'sink.buffer-flush.max-rows' is set to '0', but this option is not set to 0, this indicates that the asynchronous output feature is enabled, that is, the two threads "outputting data to operators" and "writing these data from operators to the database finally" are completely decoupled.
sink.max-retries
No
3
The maximum number of retries when writing to the database fails.

Sample Code

CREATE TABLE oceanbase_source_table (
id INT,
name STRING
) WITH (
#NAME?
'connector' = 'jdbc-oceanbase',
'url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.
'table-name' = 'my-table', -- The data table to write data to.
'username' = 'admin', -- Username for database access (INSERT permission is required).
'password' = 'MyPa$$w0rd', -- Password for database access.
'scan.partition.column = 'id', -- Column name for partitioned scan.
'scan.partition.num' = '2', -- Number of partitions.
'scan.partition.lower-bound' = '0', -- The lower bound of the first partition.
'scan.partition.upper-bound' = '100', -- The upper bound of the last partition.
'scan.fetch-size' = '1' -- Number of rows to obtain in batch on each database read.
);
CREATE TABLE oceanbase_upsert_sink_table (
id INT PRIMARY KEY NOT ENFORCED,
name STRING
) WITH (
-- Specify database connection parameters.
'connector' = 'jdbc-oceanbase',
'url' = 'jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace with your actual MySQL connection parameters.
'table-name' = 'my-upsert-table', -- The data table to write data to.
'username' = 'admin', -- Username for database access (INSERT permission is required).
'password' = 'MyPa$$w0rd', -- Password for database access.
'sink.buffer-flush.max-rows' = '200', -- Number of rows in batch output.
'sink.buffer-flush.interval' = '2s' -- Batch output interval.
);
insert into oceanbase_upsert_sink_table select * from oceanbase_source_table;

Primary Key Description

For Append (Tuple) data, you do not need to set a primary key for the job DDL or define primary keys for Oceanbase database tables. Defining a primary key is not recommended either (otherwise, write failure may occur due to duplicate data).
For Upsert data, you must define primary keys for the Oceanbase database tables, and need to add the PRIMARY KEY NOT ENFORCED constraint in the corresponding column name in the CREATE TABLE DDL statement.

Partitioned Scan

Partitioned Scan can speed up the reading of Oceanbase data tables by the Source operators with high parallelism, where each subtask can read its own assigned partitions. When using this feature, all four parameters starting with scan.partition must be specified. Otherwise, an error will be reported.
Note:
The upper bound specified with scan.partition.upper-bound and the lower bound specified with scan.partition.lower-bound refers to the step size of each partition, which will not affect the final rows of records read and data accuracy.

Lookup Cache

Flink provides the lookup cache feature to enhance the lookup performance for the dimension tables. Currently, the cache implementation is synchronous and disabled by default (each request reads the database, resulting in low throughput). You must set both parameters lookup.cache.max-rows and lookup.cache.ttl manually to enable this feature.
Note:
If the TTL in the cache is too long or there are too many rows in the cache, the Flink jobs may still read old data in the cache after the database update. So, for jobs sensitive to database changes, please use the caching feature with caution.

Batch Write Optimization

By setting the two parameters starting with sink.buffer-flush, you can achieve batch writing to the database. It is recommended to use these parameters with the parameters from the corresponding underlying databases for better batch writing performance. Otherwise, the underlying database will still write records row by row inefficiently.
We recommend that you add the rewriteBatchedStatements=true parameter to the URL connection parameters.
jdbc:oceanbase://10.1.28.93:3306/CDB?rewriteBatchedStatements=true

도움말 및 지원

문제 해결에 도움이 되었나요?

피드백