tencent cloud

Feedback

Last updated: 2023-11-08 16:01:45

    Overview

    The Kudu connector allows data reading from and writing to Kudu.

    Versions

    Flink Version
    Description
    1.11
    Supported
    1.13
    Supported
    1.14
    Unsupported
    1.16
    Supported

    Use cases

    The Kudu connector can be used as a source (for a general table and the right table in dimension-table join), a sink for tuple streams, and a source for upsert streams to allow data reading from and writing to Kudu.

    Defining a table in DDL

    As a source

    CREATE TABLE `kudu_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options to connect Kudu.
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
    'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
    'kudu.hash-columns' = 'id', -- The hash key (optional).
    'kudu.primary-key-columns' = 'id', -- The primary key (optional).
    'kudu.operation-timeout' = '10000', -- The insert timeout period (optional).
    'kudu.max-buffer-size' = '2000', -- The buffer size (optional).
    'kudu.flush-interval' = '1000' -- The interval of data flush to Kudu (optional).
    );

    As a sink (for tuple streams)

    CREATE TABLE `kudu_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options to connect Kudu.
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
    'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
    'kudu.igonre-duplicate' = 'true' -- (Optional) If this option is set to `true`, the data will be ignored if its primary key is identical with that of existing data.
    );

    As a sink (for upsert streams)

    CREATE TABLE `kudu_upsert_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options to connect Kudu.
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
    'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
    'kudu.hash-columns' = 'id', -- The hash key (optional).
    'kudu.primary-key-columns' = 'id', -- The primary key (required). When this connector is used as an upsert sink, a primary key is required.
    );

    WITH parameters

    Option
    Required
    Default Value
    Description
    connector.type
    Yes
    None
    For connection to a Kudu database, it must be 'kudu'.
    kudu.masters
    Yes
    None
    The URL of the Kudu database master server, with a default port of 7051. If the Kudu component provided by Tencent Cloud is used, you can find the master server IP and port this way: Log in to the EMR console, click ID/Name of the target cluster in the cluster list to go to its details page, and select Cluster services > Kudu > Operation > View port.
    kudu.table
    Yes
    None
    The name of the Kudu table. For example, a Kudu table created through Impala is generally named as impala::db_name.table_name, and one created with Java API as db_name.tablename.
    kudu.hash-columns
    No
    None
    The hash key.
    kudu.primary-key-columns
    No
    None
    The primary key.
    kudu.replicas
    No
    None
    The number of replicas.
    kudu.operation-timeout
    No
    30000
    The insert timeout period in ms.
    kudu.max-buffer-size
    No
    1000
    Default value: 1000.
    kudu.flush-interval
    No
    1000
    Default value: 1000.
    kudu.ignore-not-found
    No
    false
    Whether to ignore the data that is not found.
    kudu.ignore-duplicate
    No
    false
    Whether to ignore the data whose primary key is identical with that of existing data.

    Data type mapping

    Flink Type
    Kudu Type
    STRING
    STRING
    BOOLEAN
    BOOL
    TINYINT
    INT8
    SMALLINT
    INT16
    INT
    INT32
    BIGINT
    INT64
    FLOAT
    FLOAT
    DOUBLE
    DOUBLE
    BYTES
    BINARY
    TIMESTAMP(3)
    UNIXTIME_MICROS

    Example

    CREATE TABLE `kudu_source_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options to connect Kudu.
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
    'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
    'kudu.hash-columns' = 'id', -- The hash key (optional).
    'kudu.primary-key-columns' = 'id', -- The primary key (optional).
    'kudu.operation-timeout' = '10000', -- The insert timeout period (optional).
    'kudu.max-buffer-size' = '2000', -- The buffer size (optional).
    'kudu.flush-interval' = '1000' -- The interval of data flush to Kudu (optional).
    );
    
    CREATE TABLE `kudu_upsert_sink_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    -- Specify the options to connect Kudu.
    'connector' = 'kudu',
    'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- The connection URL.
    'kudu.table' = 'TableName1', -- Replace it with the table in Kudu, such as "default.TestTable1".
    'kudu.hash-columns' = 'id', -- The hash key (optional).
    'kudu.primary-key-columns' = 'id', -- The primary key (required). When this connector is used as an upsert sink, a primary key is required.
    );
    
    insert into kudu_upsert_sink_table select * from kudu_source_table;

    Notes

    1. To query a Kudu table in Impala, you need to first check whether an external table is created.
    2. By default, a table created using a way other than Impala Shell has no external table in Impala, and you need to create an external table for this table to query its records.
    3. When the Kudu connector is used as a Stream Compute Service sink, if the sink table does not exist in the Kudu database, an internal table will be created there.

    Kudu authentication with Kerberos

    1. Log in to the cluster master node to get the files krb5.conf and emr.keytab in the following paths.
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    2. Package the files into a JAR file.
    jar cvf kudu-xxx.jar krb5.conf emr.keytab
    3. Check the JAR structure (run the Vim command vim kudu-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
    5. Get the Kerberos principal and configure it in advanced job parameters.
    klist -kt /var/krb5kdc/emr.keytab
    
    # The output is as shown below, and you can just use the first principal: hadoop/172.28.22.43@EMR-E4331BF2
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 07/06/2023 18:50:41 hadoop/172.28.22.43@EMR-E4331BF2
    2 07/06/2023 18:50:41 HTTP/172.28.22.43@EMR-E4331BF2
    2 07/06/2023 18:50:41 kudu/172.28.22.43@EMR-E4331BF2
    6. Configure the principle in advanced job parameters.
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    security.kerberos.login.principal: hadoop/172.28.22.43@EMR-E4331BF2
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf
    fs.hdfs.hadoop.security.authentication: kerberos
    Note
    Kudu authentication with Kerberos may be unavailable to some existing Stream Compute Service clusters. To use this feature, please contact us to upgrade the cluster management service.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support