tencent cloud

Feedback

Last updated: 2023-11-08 15:56:45

    Versions

    Flink Version
    Description
    1.11
    Unsupported
    1.13
    Supported (use as source and sink)
    1.14
    Unsupported
    1.16
    Unsupported

    Use cases

    This connector can be used as a source or a sink.

    Defining a table in DDL

    As a sink:
    CREATE TABLE hudi_sink
    (
    uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
    ) WITH (
    'connector' = 'hudi'
    , 'path' = 'hdfs://HDFS1000/data/hudi/mor'
    , 'table.type' = 'MERGE_ON_READ' -- The MERGE_ON_READ table, which defaults to `COPY_ON_WRITE`.
    , 'write.tasks' = '3' -- Default value: 4.
    , 'compaction.tasks' = '4' -- Default value: 4.
    -- , 'hive_sync.enable' = 'true' -- Default value: false.
    -- , 'hive_sync.db' = 'default'
    -- , 'hive_sync.table' = 'datagen_mor_1'
    -- , 'hive_sync.mode' = 'jdbc'
    -- , 'hive_sync.username' = ''
    -- , 'hive_sync.password' = ''
    -- , 'hive_sync.jdbc_url' = 'jdbc:hive2://172.28.1.185:7001'
    -- , 'hive_sync.metastore.uris' = 'thrift://172.28.1.185:7004'
    );
    
    As a source:
    CREATE TABLE `source`
    (
    uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
    ) WITH (
    'connector' = 'hudi'
    , 'path' = 'hdfs://172.28.28.202:4007/path/hudidata'
    , 'table.type' = 'MERGE_ON_READ' -- The MOR table. Its incremental data cannot be read.
    , 'read.tasks' = '1' -- The parallelism of the read tasks, which defaults to 4.
    , 'hoodie.datasource.query.type' = 'snapshot' -- Default value: snapshot; other valid values: read_optimized and incremental.
    , 'read.streaming.enabled' = 'true' -- This option enables the streaming read.
    , 'read.start-commit' = 'earliest' -- Specifies the start commit instant time; the commit time format should be 'yyyyMMddHHmmss'.
    , 'read.streaming.check-interval' = '4'
    );

    WITH parameters

    Common parameters

    Option
    Required
    Default Value
    Description
    connector
    Yes
    None
    Here, it should be hudi.
    path
    Yes
    None
    The data storage path, in the format of hdfs:// for data storage in HDFS and COSN://$bucket/$path for data storage in COS.

    Parameters when as a sink

    Option
    Required
    Default Value
    Description
    table.type
    No
    COPY_ON_WRITE
    The Hudi table type. Valid values: COPY_ON_WRITE and MERGE_ON_READ.

    HoodieRecord parameters

    Option
    Required
    Default Value
    Description
    hoodie.datasource.write.recordkey.field
    No
    uuid
    The key field. If the Flink table has a primary key, use it.
    hoodie.datasource.write.partitionpath.field
    No
    ""
    The partition path field. Null indicates the table is not partitioned.
    write.precombine.field
    No
    ts
    Field used in pre-combining before actual write. When two records have the same key value, the one with the larger value will be picked for the pre-combine field, determined by Object.compareTo(..).

    Parallelism parameters

    Option
    Required
    Default Value
    Description
    write.tasks
    No
    4
    The parallelism of tasks that do actual write.
    write.index_bootstrap.tasks
    No
    None
    The parallelism of tasks that do index bootstrap, which defaults to the parallelism of the execution environment.
    write.bucket_assign.tasks
    No
    None
    The parallelism of tasks that do bucket assign, which defaults to the parallelism of the execution environment.
    compaction.tasks
    No
    4
    The parallelism of tasks that do actual compaction.

    Compaction parameters

    Option
    Required
    Default Value
    Description
    compaction.schedule.enabled
    No
    true
    Whether to enable compaction.
    compaction.async.enabled
    No
    true
    Whether to use async compaction.
    compaction.trigger.strategy
    No
    num_commits
    num_commits / time_elapsed / num_and_time / num_or_time

    Hive metadata sync parameters

    Option
    Required
    Default Value
    Description
    hive_sync.enable
    No
    false
    -
    hive_sync.db
    No
    -
    -
    hive_sync.table
    No
    -
    -
    hive_sync.mode
    No
    jdbc
    Valid values: hms, jdbc, and hiveql.
    hive_sync.username
    No
    -
    -
    hive_sync.password
    No
    -
    -
    hive_sync.jdbc_url
    No
    -
    -

    More parameters

    For more parameters, see Flink Options.

    Parameters when as a source

    Option
    Required
    Default Value
    Description
    read.tasks
    No
    4
    The parallelism of tasks that do actual read.
    hoodie.datasource.query.type
    No
    snapshot
    Valid values: snapshot, read_optimized, and incremental.
    read.streaming.enabled
    No
    false
    -
    read.streaming.check-interval
    No
    60
    The check interval for streaming read in seconds.
    read.streaming.skip_compaction
    No
    false
    -
    read.start-commit
    No
    None
    The start commit instant for reading in the format of 'yyyyMMddHHmmss'. It can be set to earliest for reading from the earliest instant for streaming read.
    read.end-commit
    No
    None
    The end commit instant for reading, which does not need to be specifically set.

    More parameters

    For more parameters, see Flink Options.

    COS configurations

    No additional configurations are required. You just need to set path to the respective cosn path.

    HDFS configurations

    Getting the HDFS JAR package

    To write data to Hudi in a Flink SQL task, if the data is stored in HDFS, a JAR package containing HDFS configurations is required to connect Flink to the target HDFS cluster. The steps to get the JAR package and to use it are as follows:
    1. Log in to the respective Hive cluster using SSH.
    2. Get hive-site.xml and hdfs-site.xml from the following paths in the EMR Hive cluster.
    /usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
    3. Package the obtained configuration files in a JAR package.
    jar -cvf hdfs-xxx.jar hdfs-site.xml
    4. Check the JAR structure (you can run a Vim command to view it). Make sure the JAR file includes the following information and has the correct structure.
    vi hdfs-xxx.jar
    META-INF/
    META-INF/MANIFEST.MF
    hdfs-site.xml

    Setting the HDFS user

    Note
    By default, Flink jobs access HDFS with a Flink user. If the Flink user does not have permission to write to HDFS, you can use advanced job parameters to set the accessing user to a user that has write permission or to the super-user hadoop.
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop

    Kerberos authentication

    1. Log in to the cluster master node to get the files krb5.conf, emr.keytab, core-site.xml, and hdfs-site.xml in the following paths.
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    /usr/local/service/hadoop/etc/hadoop/core-site.xml
    /usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
    2. Package the obtained configuration files in a JAR package.
    jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
    3. Check the JAR structure (run the Vim command vim hdfs-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    hdfs-site.xml
    core-site.xml
    4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
    5. Get the Kerberos principal and configure it in advanced job parameters.
    klist -kt /var/krb5kdc/emr.keytab
    
    # The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
    6. Configure the principal in advanced job parameters.
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf

    FAQs

    Hive sync

    The Hive table sync failed, with an error raised.
    java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat
    Check whether the Hive environment contains the JAR package required for Hudi. For details, see Hive.
    hudi-hadoop-mr-bundle-x.y.z.jar Download.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support