tencent cloud

Feedback

StarRocks

Last updated: 2023-11-08 14:24:26

    Overview

    The ‌flink-connector-starrocks connector is based on the open-source starrocks-connector-for-apache-flink v1.2.4. Data can be read from and written to StarRocks via Flink.

    Versions

    Flink Version
    Description
    1.11
    Unsupported
    1.13
    Supported (as a batch source, sink, and dimension table)
    1.14
    Supported (as a batch source, sink, and dimension table)
    1.16
    Unsupported

    As a sink

    The ‌flink-connector-starrocks connector can be used as a sink to load data into StarRocks. It has a higher and more stable performance than flink-connector-jdbc provided by Apache Flink. It caches data and batch loads it into StarRocks using the Stream Load transaction interface. Two data formats are supported: CSV and JSON.
    The following example shows how to load data from a MySQL-CDC connector to StarRocks.
    CREATE TABLE `mysql_cdc` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.
    'hostname' = '9.134.34.15', -- IP of the database server.
    'port' = '3306', -- Integer port number of the database server.
    'username' = 'root', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
    'password' = 'xxx', -- Password to use when connecting to the MySQL database server.
    'database-name' = 'test', -- Database name of the MySQL server to monitor.
    'table-name' = 'user_behavior' -- Table name of the MySQL database to monitor.
    );
    
    CREATE TABLE `pk_starrocks`(
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030',
    'load-url' = '172.28.28.98:8030',
    'database-name' = 'oceanus',
    'table-name' = 'pk_user_behavior',
    'username' = 'root',
    'password' = 'xxx',
    'sink.buffer-flush.interval-ms' = '15000',
    'sink.properties.format' = 'json',
    'sink.properties.strip_outer_array' = 'true', -- Here, it should be "true".
    -- 'sink.parallelism' = '1',
    'sink.max-retries' = '3',
    'sink.semantic' = 'exactly-once'
    );
    
    INSERT INTO `pk_starrocks` SELECT * FROM `mysql_cdc`;
    Note
    The StarRocks table needs to use the Primary Key model. Otherwise, data deletion from the source table cannot be synced to StarRocks.
    You can use the StarRocks Migration Tools (SMT) provided by StarRocks to sync the database and table schema to StarRocks as instructed in Synchronize database & table schema.

    WITH parameters

    Option
    Required
    Default Value
    Data Type
    Description
    connector
    Yes
    None
    String
    Here, it should be "starrocks".
    jdbc-url
    Yes
    None
    String
    The address that is used to connect to the MySQL server of the FE in the format of jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port..., such as jdbc:mysql://172.28.28.98:9030.
    load-url
    Yes
    None
    String
    The HTTP URL of the FE in your StarRocks cluster  in the format of fe_ip1:http_port,fe_ip2:http_port...., such as 172.28.28.98:8030.
    database-name
    Yes
    None
    String
    The name of the StarRocks database into which you want to load data.
    table-name
    Yes
    None
    String
    The name of the table that you want to use to load data into StarRocks.
    username
    Yes
    None
    String
    The username of the account that you want to use to load data into StarRocks.
    password
    Yes
    None
    String
    The password of the preceding account.
    sink.semantic
    No
    at-least-once
    String
    Valid values:
    at-least-once 
    exactly-once, which means data is flushed only when performing checkpoint. In this case, sink.buffer-flush.* options are invalid.
    sink.version
    No
    AUTO
    String
    Valid values:
    V1: Use Stream Load interface to perform exactly-once processing. It has a relatively low performance and supports all StarRocks versions.
    V2: Use Stream Load transaction to perform exactly-once processing. It requires StarRocks to be at least version 2.4.
    AUTO: Automatically choose the sink version depending on whether the version of StarRocks supports Stream Load transaction.
    sink.buffer-flush.max-bytes
    No
    94371840 (90M)
    String
    A flush option, which is available only when sink.semantic is set to at-least-once. When the size of data in the buffer exceeds the value set here, the flush to StarRocks is triggered. Value range: [64MB, 10GB].
    sink.buffer-flush.max-rows
    No
    500000
    String
    A flush option, which is available only when sink.semantic is set to at-least-once. When the number of data rows in the buffer exceeds the value set here, the flush to StarRocks is triggered. Value range: [64,000, 5000,000].
    sink.buffer-flush.interval-ms
    No
    300000
    String
    A flush option, which is available only when sink.semantic is set to at-least-once. This option specifies the interval (in milliseconds) between two flushes to StarRocks. Value range: [1000, 3600000].
    sink.max-retries
    No
    3
    String
    The number of times that the system retries to perform the Stream Load job. ‌Value range: [0, 1000].
    sink.parallelism
    No
    NULL
    String
    The parallelism of the connector. If no value is set, the global parallelism will be used.
    sink.connect.timeout-ms
    No
    1000
    String
    The timeout (ms) for waiting response from the load-url. Value range: [100, 60000].
    sink.label-prefix
    No
    No
    String
    The label prefix used by Stream Load. Supported characters: [-_A-Za-z0-9]. For more details of labels, see Optional parameters of Stream Load.
    sink.properties.format
    No
    CSV
    String
    The format of data to be loaded to StarRocks. Valid values: CSV (default) and JSON.
    sink.properties.column_separator
    No
    \\t
    String
    The column separator for CSV-formatted data. For details, see CSV parameters.
    sink.properties.row_delimiter
    No
    \\n
    String
    The row delimiter for CSV-formatted data. For details, see CSV parameters.
    sink.properties.strip_outer_array
    No
    false
    String
    Specifies whether to strip the outermost array structure for JSON-formatted data. Valid values: true and false (default). For batch load from Flink, the JSON data may have an outermost array structure as indicated by a pair of square brackets []. In this situation, we recommend that you set this parameter to **true**, so that StarRocks removes the outermost square brackets [] and loads each inner array as a separate data record. If you set this parameter to false, StarRocks parses the entire JSON data file into one array and loads the array as a single data record. For example, the JSON data to be loaded is [ {"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ]. If you set this parameter to true, StarRocks parses {"category" : 1, "author" : 2} and {"category" : 3, "author" : 4} into separate data records that are loaded into separate StarRocks table rows. For details, see JSON parameters.
    sink.properties.*
    No
    None
    String
    The parameters that are used to control Stream Load behavior, such as 'sink.properties.columns' = 'k1, v1. From StarRocks v2.4 onwards, Primary Key model supports updating specified columns. For more parameters, see STREAM LOAD.
    Note
    If you set sink.semantic to at-least-once, flush to StarRocks will be triggered when sink.buffer-flush.max-bytes, sink.buffer-flush.max-rows, or sink.buffer-flush.interval-ms is met.
    If you set sink.semantic to exactly-once, depending on Flink checkpointing, the data and its labels will be saved during each checkpoint. After the checkpointing is completed, more writes to the database are blocked, and all the data cached in state will be flushed to the sink in the first invocation to guarantee the exactly-once semantic. In this situation, sink.buffer-flush.* options are invalid.

    Data type mapping (as a sink)

    Flink Type
    StarRocks Type
    BOOLEAN
    BOOLEAN
    TINYINT
    TINYINT
    SMALLINT
    SMALLINT
    INTEGER
    INTEGER
    BIGINT
    BIGINT
    FLOAT
    FLOAT
    DOUBLE
    DOUBLE
    DECIMAL
    DECIMAL
    BINARY
    INT
    CHAR
    STRING
    VARCHAR
    STRING
    STRING
    STRING
    DATE
    DATE
    TIMESTAMP_WITHOUT_TIME_ZONE(N)
    DATETIME
    TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)
    DATETIME
    ARRAY<T>
    ARRAY<T>
    MAP<KT,VT>
    JSON STRING
    ROW<arg T...>
    JSON STRING
    Note: BYTES, VARBINARY, TIME, INTERVAL, MULTISET, and RAW in Flink are not supported. For details, see Data Types.

    Notes

    StarRocks data models

    StarRocks provides four data models as shown in Data Models: Duplicate Key, Aggregate Key, Unique Key, and Primary Key. The Primary Key model supports the pushdown of predicates and indexes. As such, the Primary Key model can deliver high query performance despite real-time and frequent data updates. Therefore, if you have no special needs, the Primary Key model is recommended.
    For upsert streams, the Primary Key model must be used. Otherwise, DELETE messages cannot be flushed to StarRocks.
    Compared with the Unique Key model based on the Merge-On-Read policy, the Primary Key model improves the query performance by 3 to 10 times.
    The Primary Key model can join multiple streams by performing update operations on individual columns.

    As a source

    WITH parameters

    Option
    Required
    Default Value
    Data Type
    Description
    connector
    Yes
    None
    String
    Here, it should be "starrocks".
    scan-url
    Yes
    None
    String
    The HTTP URL of the FE in your StarRocks cluster in the format of fe_ip1:http_port,fe_ip2:http_port...., such as 172.28.28.98:8030.
    jdbc-url
    Yes
    None
    String
    The address that is used to connect the MySQL client in the format of jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port..., such as jdbc:mysql://172.28.28.98:9030.
    username
    Yes
    None
    String
    The username of your StarRocks cluster account.
    password
    Yes
    None
    String
    The password of your StarRocks cluster account.
    database-name
    Yes
    None
    String
    The name of the StarRocks database to which the StarRocks table you want to read belongs.
    table-name
    Yes
    None
    String
    The name of the StarRocks table you want to read.
    scan.connect.timeout-ms
    No
    1000
    String
    The maximum amount of time (in milliseconds) after which the connection from the Flink connector to your StarRocks cluster times out.
    scan.params.keep-alive-min
    No
    10
    String
    The maximum amount of time (in minutes) during which the read task keeps alive.
    scan.params.query-timeout-s
    No
    600(5min)
    String
    The maximum amount of time (in seconds) after which the read task times out.
    scan.params.mem-limit-byte
    No
    102410241024 (1G)
    String
    The maximum amount of memory allowed per query on each BE.
    scan.max-retries
    No
    1
    String
    The maximum number of times that the read task can be retried upon failures.
    lookup.cache.ttl-ms
    No
    5000
    Long
    ‌The maximum amount of time (in milliseconds) after which the query cache of the dimension table times out.

    As a batch source

    CREATE TABLE `starrocks` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks' ,
    'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port
    'scan-url' = '172.28.28.98:8030', -- http_port
    'database-name' = 'oceanus',
    'table-name' = 'pk_user_behavior',
    'username' = 'root',
    'password' = 'xxx'
    );
    
    CREATE TABLE `print_sink` (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'logger'
    );
    
    INSERT INTO `print_sink`
    SELECT * FROM starrocks;

    As a dimension table

    CREATE TABLE `starrocks` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks' ,
    'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port
    'scan-url' = '172.28.28.98:8030', -- http_port
    'database-name' = 'oceanus',
    'table-name' = 'pk_user_behavior',
    'username' = 'root',
    'password' = 'xxx'
    );
    
    CREATE TABLE `datagen` (
    `user_id` BIGINT,
    `ts` as PROCTIME(),
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.user_id.min' = '1',
    'fields.user_id.max' = '20'
    );
    
    CREATE TABLE `print_sink` (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING,
    `ts` TIMESTAMP,
    PRIMARY KEY (`user_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'logger'
    );
    
    INSERT INTO `print_sink`
    SELECT a.user_id,b.item_id,b.behavior,a.ts
    FROM `datagen` a LEFT JOIN `starrocks` FOR SYSTEM_TIME AS OF a.ts as b
    ON a.user_id = b.user_id;

    Data type mapping (as a source)

    StarRocks
    Flink
    NULL
    NULL
    BOOLEAN
    BOOLEAN
    TINYINT
    TINYINT
    SMALLINT
    SMALLINT
    INT
    INT
    BIGINT
    BIGINT
    LARGEINT
    STRING
    FLOAT
    FLOAT
    DOUBLE
    DOUBLE
    DATE
    DATE
    DATETIME
    TIMESTAMP
    DECIMAL
    DECIMAL
    DECIMALV2
    DECIMAL
    DECIMAL32
    DECIMAL
    DECIMAL64
    DECIMAL
    DECIMAL128
    DECIMAL
    CHAR
    CHAR
    VARCHAR
    STRING
    
    
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support