tencent cloud

Feedback

ClickHouse

Last updated: 2023-11-08 16:16:02

    Overview

    The ClickHouse sink connector supports data write to ClickHouse data warehouses. The ClickHouse source connector enables ClickHouse to be used as a batch source and a dimension table.

    Versions

    Flink Version
    Description
    1.11
    Supported (use as sink)
    1.13
    Supported (use as source and sink)
    1.14
    Supported (use as source and sink)
    1.16
    Supported (use as source and sink)

    ‌## Limits

    The ClickHouse connector does not support standard UPDATE and DELETE operations. For a ClickHouse sink connector, if you need to perform UPDATE and DELETE operations, see CollapsingMergeTree.
    For JAR jobs written in Java/Scala, data can be written to ClickHouse using JDBC, which will not be elaborated on here.

    Defining a table in DDL

    As a sink with insert only

    CREATE TABLE clickhouse_sink_table (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the parameters for database connection.
    'connector' = 'clickhouse', -- Specify the connector to use. Here, it should be `clickHouse`.
    'url' = 'clickhouse://172.28.28.160:8123', -- Specify the cluster address, which can be viewed on the ClickHouse cluster page.
    -- You don't need to specify it if the ClickHouse cluster is not configured with an account and password.
    --'username' = 'root', -- The ClickHouse cluster username.
    --'password' = 'root', -- The ClickHouse cluster password.
    'database-name' = 'db', -- The database to write data to.
    'table-name' = 'table', -- The table to write data to.
    'sink.batch-size' = '1000' -- The number of data records that trigger a batch write.
    );

    As a sink with upsert

    CREATE TABLE clickhouse_upsert_sink_table (
    `id` INT,
    `name` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
    ) WITH (
    -- Specify the parameters for database connection.
    'connector' = 'clickhouse', -- Specify the connector to use. Here, it should be `clickHouse`.
    'url' = 'clickhouse://172.28.28.160:8123', -- Specify the cluster address, which can be viewed on the ClickHouse cluster page.
    -- You don't need to specify it if the ClickHouse cluster is not configured with an account and password.
    --'username' = 'root', -- The ClickHouse cluster username.
    --'password' = 'root', -- The ClickHouse cluster password.
    'database-name' = 'db', -- The database to write data to.
    'table-name' = 'table', -- The table to write data to.
    'table.collapsing.field' = 'Sign', -- The name of the CollapsingMergeTree type column field.
    'sink.batch-size' = '1000' -- The number of data records that trigger a batch write.
    );
    Note
    You must define a primary key and declare the table.collapsing.field field to support upsert. For the ClickHouse table creation statement, see FAQs.

    As a batch source

    CREATE TABLE `clickhouse_batch_source` (
    `when` TIMESTAMP,
    `userid` BIGINT,
    `bytes` FLOAT
    ) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://172.28.1.21:8123',
    'database-name' = 'dts',
    'table-name' = 'download_dist'
    -- 'scan.by-part.enabled' = 'false', -- Whether to read the ClickHouse table by part. To enable this, you must first stop the backend merges and TTL-based data deletions of the table using the commands 'STOP MERGES' and 'STOP TTL MERGES' on all nodes; otherwise, an error will occur in data reading.
    -- 'scan.part.modification-time.lower-bound' = '2021-09-24 16:00:00', -- The minimum `modification_time` (inclusive) for filtering ClickHouse table parts, in the format of `yyyy-MM-dd HH:mm:ss`.
    -- 'scan.part.modification-time.upper-bound' = '2021-09-17 19:16:26', -- The maximum `modification_time` (exclusive) for filtering ClickHouse table parts, in the format of `yyyy-MM-dd HH:mm:ss`.
    -- 'local.read-write' = 'false', -- Whether to read the local table. Default value: false.
    -- 'table.local-nodes' = '172.28.1.24:8123,172.28.1.226:8123,172.28.1.109:8123,172.28.1.36:8123' -- The list of local nodes (the HTTP port number is required). Note that only one replica node address can be configured for each shard; otherwise, duplicate data may be read.
    );
    Note
    Only MergeTree engines support reading by part, and you must stop backend merge and TTL-based data deletions on all nodes with the table being read to avoid inaccurate data reading caused by part changes. Only one replica node address can be configured for each shard for local reading; otherwise, duplicate data may be read.

    As a dimension table

    CREATE TABLE `clickhouse_dimension` (
    `userid` BIGINT,
    `comment` STRING
    ) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://172.28.1.21:8123',
    'database-name' = 'dimension',
    'table-name' = 'download_dist',
    'lookup.cache.max-rows' = '500', -- The maximum number of data records allowed in the Lookup Cache.
    'lookup.cache.ttl' = '10min', -- The maximum cache time of each record.
    'lookup.max-retries' = '10' -- The maximum number of retries upon database query failure.
    );

    WITH parameters

    Option
    Required
    Default Value
    Description
    connector
    Yes
    -
    Required if ClickHouse is to be used as the data sink. Here, it should be clickhouse.
    url
    Yes
    -
    The ClickHouse cluster connection URL, such as 'clickhouse://127.1.1.1:8123', which can be viewed on the cluster page.
    name-server
    No
    -
    The ClickHouse cluster username.
    password
    No
    -
    The ClickHouse cluster password.
    database-name
    Yes
    -
    The ClickHouse cluster database.
    table-name
    Yes
    -
    The ClickHouse cluster table.
    sink.batch-size
    No
    1000
    The number of records to flush to the connector.
    sink.flush-interval
    No
    1000 (unit: ms)
    The interval for async threads to flush data to the ClickHouse connector.
    table.collapsing.field
    No
    -
    The name of the CollapsingMergeTree type column field.
    sink.max-retries
    No
    3
    The number of retries upon write failure.
    local.read-write
    No
    false
    Whether to enable the feature of writing to the local table. The default value is false.
    Note: This feature is available only to advanced users and must be used together with the parameters described in the "Writing to local table" section of this document.
    table.local-nodes
    No
    -
    The list of local nodes when local.read-write is set to true. Example: '127.1.1.10:8123,127.1.2.13:8123’ (the HTTP port number is required)
    sink.partition-strategy
    No
    balanced
    The partition strategy when local.read-write is set to true. If you want to achieve dynamic updates and the table engine is CollapsingMergeTree, the value must be hash, and sink.partition-key must be set.
    Valid values: balanced (round-robin), shuffle (random), and hash (by sink.partition-key).
    sink.partition-key
    No
    -
    Required when local.read-write is set to true and sink.partition-strategy to hash. The value is the primary key defined in the table. If the primary key consists of multiple fields, you need to specify the value as the first field.
    sink.ignore-delete
    No
    false
    Whether to ignore all DELETE messages that are written to ClickHouse. This option is suitable for scenarios where the ReplacingMergeTree table engine is used and dynamic data updates are expected.
    sink.backpressure-aware
    No
    false
    If the "Too many parts" error message frequently appears in Flink logs, and this causes job crashes, you can enable this option to significantly reduce the server load and improve overall throughput and stability.
    sink.reduce-batch-by-key
    No
    false
    Whether to merge data with the same key by retaining only the last record for ClickHouse sink tables that have defined primary keys, within the given flush interval.
    sink.max-partitions-per-insert
    No
    20
    When ClickHouse is a partitioned table and the partitioning function in ClickHouse is intHash32, toYYYYMM, or toYYYYMMDD, Flink writes to ClickHouse by buffering data on the sink side based on partitions. When the number of accumulated partitions reaches the configured value, it triggers the write to downstream ClickHouse (if sink.flush-interval or sink.batch-size is reached first, they will trigger the write first). This greatly improves the throughput efficiency of writing to ClickHouse. Setting this option to -1 will disable the aggregation writing feature for a partitioned table.
    scan.fetch-size
    No
    100
    The number of rows obtained in batches each time the database is read.
    scan.by-part.enabled
    No
    false
    Whether to read the ClickHouse table by part. If this is enabled, you must first stop the backend merges and TTL-based data deletions of the table using the commands 'STOP MERGES' and 'STOP TTL MERGES' on all nodes; otherwise, an error will occur in data reading.
    scan.part.modification-time.lower-bound
    No
    -
    The minimum modification_time (inclusive) for filtering ClickHouse table parts, in the format of yyyy-MM-dd HH:mm:ss.
    scan.part.modification-time.upper-bound
    No
    -
    The maximum modification_time (exclusive) for filtering ClickHouse table parts, in the format of yyyy-MM-dd HH:mm:ss.
    lookup.cache.max-rows
    No
    -
    The maximum number of data records cached in the Lookup Cache.
    lookup.cache.ttl
    No
    -
    The maximum cache time of each data record in the Lookup Cache.
    lookup.max-retries
    No
    3
    The maximum number of retries upon database lookup failure.
    Note
    When defining WITH parameters, you usually only need to specify the required ones. When you enable optional parameters, be sure to understand their meanings and how they may affect data writing.

    Data type mapping

    For the data types supported by ClickHouse, see ClickHouse Data ‍Types. The table below lists some of the common data types and their counterparts in Flink. We recommend you map Flink data types to ClickHouse data types according to the table below to avoid unexpected results. Pay special attention to the following:
    DateTime: ClickHouse supports a time precision of 1 second and its default configuration of date_time_input_format is basic. So it can only parse time in the format of YYYY-MM-DD HH:MM:SS or YYYY-MM-DD. If your job encounters time parsing exceptions (e.g., java.sql.SQLException: Code: 6. DB::Exception: Cannot parse string '2023-05-24 14:34:55.166' as DateTime), refer to the table below to change the corresponding data type in Flink to TIMESTAMP(0), or change the value of date_time_input_format of the ClickHouse cluster to best_effort. In addition, ClickHouse supports inserting DateTime data in integer format, so you can also map the type as INTEGER in Flink, but this is not recommended.
    DateTime64: ClickHouse supports inserting DateTime64 data in integer and DECIMAL formats, so you may choose to map it as BIGINT or DECIMAL in Flink. If you map it as BIGINT in Flink, ensure that the BIGINT value to write matches the precision of DateTime64. For example, the default precision for DateTime64 is milliseconds, that is, DateTime64(3) during table creation in ClickHouse, so the BIGINT value to write should also be in milliseconds, for example, 1672542671000 (2023-01-01 11:11:11.000). To avoid issues, it is recommended to map it as the TIMESTAMP data type according to the table below.
    ClickHouse Type
    Flink Type
    Java Type
    String
    VARCHAR/STRING
    String
    FixedString(N)
    VARCHAR/STRING
    String
    Bool
    BOOLEAN
    Byte
    Int8
    TINYINT
    Byte
    UInt8
    SMALLINT
    Short
    Int16
    SMALLINT
    Short
    UInt16
    INTEGER
    Integer
    Int32
    INTEGER
    Integer
    UInt32
    BIGINT
    Long
    Int64
    BIGINT
    Long
    UInt64
    BIGINT
    Long
    Int128
    DECIMAL
    BigInteger
    UInt128
    DECIMAL
    BigInteger
    Int256
    DECIMAL
    BigInteger
    UInt256
    DECIMAL
    BigInteger
    Float32
    FLOAT
    Float
    Float64
    DOUBLE
    Double
    Decimal(P,S)/Decimal32(S)/Decimal64(S)/Decimal128(S)/Decimal256(S)
    DECIMAL
    BigDecimal
    Date
    DATE
    LocalDateTime
    DateTime([timezone])
    TIMESTAMP(0)
    LocalDateTime
    DateTime64(precision, [timezone])
    TIMESTAMP(precision)
    LocalDateTime
    Array(T)
    ARRAY<T>
    T[]
    Map(K, V)
    MAP<K, V>
    Map<?, ?>
    Tuple(T1, T2, ...)
    ROW<f1 T1, f2 T2, ...>
    List<Object>

    Example

    CREATE TABLE datagen_source_table (
    id INT,
    name STRING
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1' -- The number of data records generated per second.
    );
    CREATE TABLE clickhouse_sink_table (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the parameters for database connection.
    'connector' = 'clickhouse', -- Specify the use of the ClickHouse connector.
    'url' = 'clickhouse://172.28.28.160:8123', -- Specify the cluster address, which can be viewed on the ClickHouse cluster page.
    -- You don't need to specify it if the ClickHouse cluster is not configured with an account and password.
    --'username' = 'root', -- The ClickHouse cluster username.
    --'password' = 'root', -- The ClickHouse cluster password.
    'database-name' = 'db', -- The database to write data to.
    'table-name' = 'table', -- The table to write data to.
    'sink.batch-size' = '1000' -- The number of data records that trigger a batch write.
    );
    insert into clickhouse_sink_table select * from datagen_source_table;

    FAQs

    Data upsert and delete

    ClickHouse does not support upsert. So for scenarios that require dynamic data updates and deletions, ReplacingMergeTree or CollapsingMergeTree is usually used to simulate update or delete operations. These table engines are suitable for different scenarios.

    ReplacingMergeTree table engine

    If the ClickHouse table engine is ReplacingMergeTree, you can set sink.ignore-delete to true so that Flink will automatically ignore DELETE messages and convert INSERT and UPDATE_AFTER messages into INSERT messages. Then, ClickHouse will automatically use the latest record to overwrite the previous record with the same primary key, achieving data updates.
    Note that this mode only supports data insert and update operations, but not delete operations. Therefore, if you have simple ETL scenarios like CDC that require precise synchronization, you can use the CollapsingMergeTree table engine described below to achieve better results.

    CollapsingMergeTree table engine

    If the ClickHouse table engine is CollapsingMergeTree, you can specify Sign by using the table.collapsing.field option. This engine works by sending messages with the same content but opposite Sign values to achieve the deletion (cancellation) of old data and the insertion of new data.
    In a production environment, ReplicatedCollapsingMergeTree is commonly used. However, the automatic deduplication feature of ReplicatedMergeTree may result in multiple records written to ClickHouse within a short time being identified as duplicate data, leading to data loss. In such cases, you can specify replicated_deduplication_window=0 when creating or modifying a table to disable the automatic deduplication feature.
    Example:
    CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;
    For more information about automatic deduplication, see Data Replication.

    sharding_key of a ClickHouse distributed table

    When creating a distributed table, set sharding_key in the statement ENGINE = Distributed(cluster_name, database_name, table_name[, sharding_key]); to the primary key of the sink table in Flink SQL. This ensures that records with the same primary key are written to the same node.
    The parameters in the statement are described as below:
    cluster_name: The cluster name, which corresponds to the custom name specified in the cluster configuration.
    database_name: The database name.
    table_name: The table name.
    sharding_key: (Optional) The key used for sharding. During the data writing process, the distributed table will distribute data to local tables on different nodes based on the rules defined by the sharding key.

    Writing to local tables (feature for advanced users)

    If the local.read-write option is set to true, Flink can directly write to local tables.
    Note
    Enabling write to local tables can significantly improve the throughput. However, if the ClickHouse cluster undergoes scaling or node replacement later, it may result in uneven data distribution, write failure, or data update failure. Therefore, this feature is intended only for advanced users.
    If a table has a primary key and uses UPDATE and DELETE semantics, we recommend you use the CollapsingMergeTree table engine when creating the table. You can specify the Sign field using the table.collapsing.field option and set sink.partition-strategy to hash to ensure that data with the same primary key is distributed to the same shard. In addition, set sink.partition-key to the primary key field (for composite primary keys, set it to the first field).

    Null data

    If certain fields in the data can be nullable, you need to set the field declaration in the ClickHouse table creation DDL statements to Nullable. Otherwise, an error may occur in data writing.
    CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id ;

    Optimization of writing to partitioned tables

    When writing to a ClickHouse partitioned table, if the partition definition in ClickHouse uses functions supported by Stream Compute Service (intHash32, toYYYYMM, and toYYYYMMDD), Stream Compute Service enables the buffering of data by partition before writing by default. With this feature enabled, Stream Compute Service tries to include as few partitions as possible in each batch of data (when the throughput of business partition data is large enough, each batch contains only one partition), thereby improving ClickHouse's merge performance. If the data for a single partition has not reached the batch size, data of multiple partitions is merged into one batch and then written to ClickHouse. You can refer to sink.max-partitions-per-insert described above for detailed configuration options.

    Monitoring metric description

    Stream Compute Service adds many practical metrics in the ClickHouse connector. Click the ClickHouse sink operator in the execution graph in Flink UI a‌nd search one of the metrics:
    numberOfInsertRecords: The number of output +I messages.
    numberOfDeleteRecords: The number of output -D messages.
    numberOfUpdateBeforeRecords: The number of output -U messages.
    numberOfUpdateAfterRecords: The number of output +U messages.

    Example: ClickHouse table creation statements

    CollapsingMergeTree table creation statement supporting UPDATE and DELETE

    -- Creating a database
    CREATE DATABASE test ON cluster default_cluster;
    
    -- Creating a local table
    CREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;
    
    -- Creating a distributed table based on a local table
    CREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);

    MergeTree table creation statement including only INSERT

    -- Creating a database
    CREATE DATABASE test ON cluster default_cluster;
    
    -- Creating a local table
    CREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64) ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}') ORDER BY id SETTINGS replicated_deduplication_window = 0;
    
    -- Creating a distributed table based on a local table
    CREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support