tencent cloud

Feedback

Iceberg

Last updated: 2023-11-08 16:02:26

    Versions

    Flink Version
    Description
    1.11
    Unsupported
    1.13
    Supported (use as source and sink)
    1.14
    Supported (use as source and sink)
    1.16
    Unsupported

    ‌## Use cases

    This connector can be used as a source or a sink. When used as a source, it does not support an Iceberg source to which data is written with the upsert operations.

    Defining a table in DDL

    As a sink:
    CREATE TABLE `sink` (
    `id` bigint,
    `YCSB_KEY` string,
    `FIELD0` string,
    `FIELD1` string,
    `FIELD2` string,
    `database_name` string,
    `table_name` string,
    `op_ts` timestamp(3),
    `date` string
    ) PARTITIONED BY (`date`) WITH (
    'connector' = 'iceberg',
    'hdfs://HDFS14979/usr/hive/warehouse',
    'write.upsert.enabled'='false', -- Whether to enable "upsert".
    'catalog-type' = 'hive',
    'catalog-name'='xxx',
    'catalog-database'='xxx',
    'catalog-table'='xxx',
    -- The thrift URI of the Hive metastore, which can be obtained from the configuration file hive-site.xm and whose key is "hive-metastore-uris".
    'uri'='thrift://ip:port',
    'engine.hive.enabled' = 'true',
    'format-version' = '2'
    );
    As a source:
    CREATE TABLE `icesource` (
    `id` bigint,
    `YCSB_KEY` string,
    `FIELD0` string,
    `FIELD1` string,
    `FIELD2` string,
    `database_name` string,
    `table_name` string,
    `op_ts` timestamp(3),
    PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
    'connector' = 'iceberg',
    'catalog-name' = 'hive_catalog',
    'catalog-type' = 'hive',
    'catalog-database' = 'database_ta',
    'catalog-table' = 't_p1_hive3_avro_3',
    'warehouse'='hdfs://HDFS14979/usr/hive/warehouse',
    'engine.hive.enabled' = 'true',
    'format-version' = '2',
    'streaming'='true',
    'monitor-interval'='10',
    -- The thrift URI of the Hive metastore, which can be obtained from the configuration file hive-site.xm and whose key is "hive-metastore-uris".
    'uri'='thrift://ip:port'
    );

    WITH parameters

    ‌### Common parameters

    Option
    Required
    Default Value
    Description
    connector
    Yes
    None
    Here, it should be iceberg.
    location
    Yes
    None
    The data storage path, in the format of hdfs:// for data storage in HDFS and COSN://$bucket/$path for data storage in COS.
    catalog-name
    Yes
    None
    A custom catalog name.
    catalog-type
    Yes
    None
    The catalog type. Valid values: hadoop, hive, and custom.
    catalog-database
    Yes
    None
    The name of the Iceberg database.
    catalog-table
    Yes
    None
    The name of the Iceberg table.
    catalog-impl
    No
    None
    This option is required when catalog-type is set to custom.
    uri
    No
    None
    -- The thrift URI of the Hive metastore, which can be obtained from the configuration file hive-site.xm and whose key is "hive-metastore-uris; Eg. thrift://172.28.1.149:7004".
    format-version
    No
    1
    For more Iceberg formats, see Iceberg Table Spec.
    For more options, see Configuration.

    COS configuration

    No additional configurations are required. You just need to set path to the respective cosn path.

    HDFS configuration

    Getting the HDFS JAR package

    To write data to Iceberg in a Flink SQL task, if the data is stored in HDFS, a JAR package containing HDFS configurations is required to connect Flink to the target HDFS cluster. The steps to get the JAR package and to use it are as follows:
    1. Log in to the respective Hive cluster using the SSH method.
    2. Get hive-site.xml and hdfs-site.xml from the following paths in the EMR Hive cluster.
    /usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
    3. Package the obtained configuration files in a JAR package.
    jar -cvf hdfs-xxx.jar hdfs-site.xml
    4. Check the JAR structure (run a Vim command to view it). Make sure the JAR file includes the following information and has the correct structure.
    vi hdfs-xxx.jar
    META-INF/
    META-INF/MANIFEST.MF
    hdfs-site.xml

    Setting the HDFS user

    Note
    By default, Flink jobs access HDFS with a Flink user. If the Flink user does not have permission to write to HDFS, you can use advanced job parameters to set the accessing user to a user that has write permission or to the super-user hadoop.
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop

    Kerberos authentication

    1. Log in to the cluster master node to get the files krb5.conf, emr.keytab, core-site.xml, and hdfs-site.xml in the following paths.
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    /usr/local/service/hadoop/etc/hadoop/core-site.xml
    /usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
    2. Package the obtained configuration files in a JAR package.
    jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
    3. Check the JAR structure (run the Vim command vim hdfs-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    hdfs-site.xml
    core-site.xml
    4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
    5. Get the Kerberos principal and configure it in advanced job parameters.
    klist -kt /var/krb5kdc/emr.keytab
    
    # The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
    6. Configure the principle in advanced job parameters.
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf
    Note
    The values of security.kerberos.login.keytab and security.kerberos.login.conf are the respective file names.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support