tencent cloud

Feedback

Last updated: 2023-11-08 14:50:37

    Overview

    The Hive connector can be used as a sink of streams, but it only supports append streams, but not upsert streams. Supported data formats include Text, SequenceFile, ORC, and Parquet.

    Versions

    Flink Version
    Description
    1.11
    Hive v1.1.0, v2.3.2, v2.3.5, and v3.1.1 supported
    Option 'connector.type' = 'hive'
    1.13
    Hive v1.0.0 - v1.2.2, v2.0.0 - v2.2.0, v2.3.0 - v2.3.6, and v3.0.0 - 3.1.2 supported
    Option 'connector.type' = 'hive'
    1.14
    Unsupported
    1.16
    Hive v2.0.0 - v2.2.0, v2.3.0 - v2.3.6, and v3.0.0 - 3.1.2 supported
    Option 'connector.type' = 'hive'

    Defining a table in DDL

    As a sink

    CREATE TABLE hive_table (
    `id` INT,
    `name` STRING,
    `dt` STRING,
    `hr` STRING
    ) PARTITIONED BY (dt, hr)
    with (
    'connector' = 'hive', -- For Flink v1.13, set 'connector' to 'hive'.
    'hive-version' = '3.1.1',
    'hive-database' = 'testdb',
    'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
    'sink.partition-commit.trigger'='partition-time',
    'sink.partition-commit.delay'='1 h',
    'sink.partition-commit.policy.kind'='metastore,success-file'
    );

    Job configurations

    Create a Hive table in a Hive database.
    # Create the table "hive_table" in the Hive database "testdb".
    USE testdb;
    CREATE TABLE `hive_table` (
    `id` int,
    `name` string)
    PARTITIONED BY (`dt` string, `hr` string)
    STORED AS ORC;
    Grant the write access to the HDFS path of the Hive table with one of the following methods.
    Method 1: Log in to the EMR Hive cluster as instructed in Basic Hive Operations, and execute the chmod command on hive_table table of the target database testdb.
    hdfs dfs -chmod 777 /usr/hive/warehouse/testdb.db/hive_table
    Method 2: Go to Jobs > Job parameters of the target job in the console, and add the following advanced parameters to gain access to the HDFS path using the Hadoop user role.
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    Note
    The Hive table used in the Flink SQL statements is testdb.hive_table. Here in the CREATE TABLE statements, the table name used is that in the Hive database (Flink v1.13 supports using the value of the hive-table option to overwrite this value), and the database name is specified using the hive-database option.

    WITH parameters

    Option
    Required
    Default Value
    Description
    connector.type
    Yes
    None
    Available to Flink v1.11. To use the Hive connector, set it to hive.
    connector
    Yes
    None
    Available to Flink v1.11. To use the Hive connector, set it to hive.
    hive-version
    Yes
    None
    The version of the Hive cluster created in the EMR console.
    hive-database
    Yes
    None
    The Hive database to write data to.
    hive-table
    No
    None
    Available to Flink v1.13. Its value is used as the table name of the Hive database.
    sink.partition-commit.trigger
    No
    process-time
    The partition commit trigger. Valid values:
    process-time: A partition will be committed when a certain time elapses after the partition creation time. The creation time refers to the physical creation time.
    partition-time: A partition will be committed when a certain time elapses after the partition creation time. Here, the creation time is extracted from partition values. partition-time requires watermark generation to automatically detect partitions. A partition is committed once watermark passes the time extracted from partition values plus delay.
    sink.partition-commit.delay
    No
    0s
    The time to wait before closing a partition. A partition will not be committed until the specified time elapses after its creation time.
    sink.partition-commit.policy.kind
    Yes
    None
    The partition committing policy. Valid values (combinations are allowed):
    success-file: When a partition is committed, a _success file will be generated in the partition's directory.
    metastore: Add the partition to the Hive Metastore.
    custom: A user-defined partition committing policy.
    partition.time-extractor.timestamp-pattern
    No
    None
    The partition timestamp extraction format. The timestamps should be yyyy-mm-dd hh:mm:ss, with the placeholders replaced with the corresponding partition fields in the Hive table. By default, yyyy-mm-dd hh:mm:ss is extracted from the first field.
    If timestamps are extracted from a single partition field dt, you can set this to $dt.
    If timestamps are extracted from multiple partition fields, such as year, month, day, and hour, you can set this to $year-$month-$day $hour:00:00.
    If timestamps are extracted from two partition fields dt and hour, you can set this to $dt $hour:00:00.
    sink.partition-commit.policy.class
    No
    None
    The partition committing policy class, which needs to be used together with sink.partition-commit.policy.kind = 'custom' and must implement PartitionCommitPolicy.
    partition.time-extractor.kind
    No
    default
    The partition time extractor, which applies only when sink.partition-commit.trigger is set to partition-time. If you have your own time extractor, set this to custom.
    partition.time-extractor.class
    No
    None
    The partition time extractor class. This class should implement the PartitionTimeExtractor API.

    Example

    CREATE TABLE datagen_source_table (
    id INT,
    name STRING,
    log_ts TIMESTAMP(3),
    WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '10'
    );
    
    CREATE TABLE hive_table (
    `id` INT,
    `name` STRING,
    `dt` STRING,
    `hr` STRING
    ) PARTITIONED BY (dt, hr)
    with (
    'connector' = 'hive', -- For Flink v1.13, set 'connector' to 'hive'.
    'hive-version' = '3.1.1',
    'hive-database' = 'testdb',
    'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
    'sink.partition-commit.trigger'='partition-time',
    'sink.partition-commit.delay'='1 h',
    'sink.partition-commit.policy.kind'='metastore,success-file'
    );
    
    -- streaming sql, insert into hive table
    INSERT INTO hive_table
    SELECT id, name, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
    FROM datagen_source_table;

    Hive configurations

    Getting the Hive connection JAR package

    To write data to Hive in a Flink SQL task, a JAR package containing Hive and HDFS configurations is required to connect Flink to the target Hive cluster. The steps to get the JAR package and to use it are as follows:
    1. Log in to the respective Hive cluster using SSH.
    2. Get hive-site.xml and hdfs-site.xml from the following paths in the EMR Hive cluster.
    /usr/local/service/hive/conf/hive-site.xml
    /usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
    3. Modifies hive-site.xml.
    Add the following in "hive-site", and set "ip" to the value of "hive.server2.thrift.bind.host"
    <property>
    <name>hive.metastore.uris</name>
    <value>thrift://ip:7004</value>
    </property>
    5. Package the obtained configuration files in a JAR package.
    jar -cvf hive-xxx.jar hive-site.xml hdfs-site.xml hivemetastore-site.xml hiveserver2-site.xml
    6. Check the JAR structure (run the Vi command vi hive-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
    META-INF/
    META-INF/MANIFEST.MF
    hive-site.xml
    hdfs-site.xml
    hivemetastore-site.xml
    hiveserver2-site.xml

    Using a JAR package in a task

    Select the Hive connection JAR package as the referenced package. This JAR package is hive-xxx.jar obtained in Getting the Hive connection JAR package and must be uploaded in Dependencies before use.

    Kerberos authentication

    1. Log in to the cluster master node to get the files krb5.conf, emr.keytab, core-site.xml, hdfs-site.xml, and hive-site.xml in the following paths.
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    /usr/local/service/hadoop/etc/hadoop/core-site.xml
    /usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
    /usr/local/service/hive/conf/hive-site.xml
    2. ‌Modify hive-site.xml. Add the following in "hive-site", and set "ip" to the value of "hive.server2.thrift.bind.host".
    <property>
    <name>hive.metastore.uris</name>
    <value>thrift://ip:7004</value>
    </property>
    4. Package the obtained configuration files in a JAR package.
    jar cvf hive-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml hive-site.xml hivemetastore-site.xml hiveserver2-site.xml
    5. Check the JAR structure (run the Vim command vim hdfs-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    hdfs-site.xml
    core-site.xml
    hive-site.xml
    hivemetastore-site.xml
    hiveserver2-site.xml
    6. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
    7. Get Kerberos principals to configure advanced job parameters.
    klist -kt /var/krb5kdc/emr.keytab
    
    # The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: ${krb5.conf.fileName}
    Note
    Kerberos authentication is not supported for historical Stream Compute Service clusters. To support the feature, please contact us to upgrade the cluster management service.

    Notes

    If the Flink job runs properly and no errors are reported in the logs, but this Hive table cannot be found in the client, fix the table with the following command (replace hive_table_xxx with the name of the table to be fixed).
    msck repair table hive_table_xxx;
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support