tencent cloud

Feedback

Kafka Data import

Last updated: 2024-01-19 16:45:30
    This document describes how to consume data from Kafka to Cloud Data Warehouse in real time.

    Prerequisites

    The Kafka data source cluster and the target Cloud Data Warehouse cluster must be in the same VPC.

    Directions

    1. Log in to the Cloud Data Warehouse cluster and create a Kafka consumption table.
    CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
    ) ENGINE = Kafka
    SETTINGS
    kafka_broker_list = 'localhost:9092',
    kafka_topic_list = 'topic',
    kafka_group_name = 'group',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 1,
    kafka_max_block_size = 65536,
    kafka_skip_broken_messages = 0,
    kafka_auto_offset_reset = 'latest';
    The common parameters are described as follows:
    Name
    Required
    Description
    kafka_broker_list
    Yes
    A list of Kafka service brokers separated by commas. We recommend you use `Ip:port` instead of domain name to avoid possible DNS resolution issues.
    kafka_topic_list
    Yes
    Kafka topics separated by commas
    kafka_group_name
    Yes
    Kafka consumption group name
    kafka_format
    Yes
    Kafka data format. For information on ClickHouse-supported formats, see the parameters in
    kafka_row_delimiter
    No
    Row delimiter used to split data rows. The default value is `\\n`, but you can also set it to another value according to the actual segmentation format during data write.
    kafka_num_consumers
    No
    Number of consumers for a single Kafka engine. You can increase the consumption data throughput by increasing this parameter value, but it cannot exceed the total number of partitions in the corresponding topic.
    kafka_max_block_size
    No
    Block size of the target table to which Kafka data is written. It is 65536 bytes by default. If the data size exceeds this value, the data will be flushed.
    kafka_skip_broken_messages
    No
    Number of data records with parsing exceptions that can be ignored. If the number of exceptions exceeds the specified value (`N`), the backend thread will stop. The default value is 0.
    kafka_commit_every_batch
    No
    Frequency of Kafka commit execution. 0: commits only after the data of an entire block is written. 1: commits after the data of each batch is written.
    kafka_auto_offset_reset
    No
    The offset from which to read Kafka data. Its value can be `earliest` or `latest`.
    2. Create a ClickHouse local table (target table).
    If your cluster has one replica:
    CREATE TABLE daily on cluster default_cluster
    (
      day Date,
       level String,
      total UInt64
    )
    engine = SummingMergeTree()
    order by int_id;
    If your cluster has two replicas:
    create table daily on cluster default_cluster
    (
      day Date,
       level String,
      total UInt64
    )
    engine = ReplicatedSummingMergeTree('/clickhouse/tables/test/test/{shard}', '{replica}')
    order by int_id;`
    Create a distributed table:
    create table daily_dis on cluster default_cluster
    AS test.test
    engine = Distributed('default_cluster', 'default', 'daily', rand());
    3. Create a materialized view to sync data consumed by the Kafka consumption table to the ClickHouse target table.
    CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;
    4. Query the data.
    SELECT level, sum(total) FROM daily GROUP BY level;

    Notes

    If you want to stop receiving topic data or change the conversion logic, perform detach and attach view operations.
    DETACH TABLE consumer;
    ATTACH TABLE consumer;
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support