tencent cloud

Feedback

Last updated: 2023-11-08 15:31:46

    INSERT INTO

    ‌INSERT INTO is used together with SELECT to write the selected data to the specified sink.

    Syntax

    INSERT INTO sink name
    SELECT subclause

    Example

    The example below inserts the result of the SELECT query to the sink named KafkaSink1.
    INSERT INTO KafkaSink1
    SELECT s1.time_, s1.client_ip, s1.uri, s1.protocol_version, s2.status_code, s2.date_
    FROM KafkaSource1 AS s1, KafkaSource2 AS s2
    WHERE s1.time_ = s2.time_ AND s1.client_ip = s2.client_ip;

    About sinks

    Connector package

    If a sink is specified using WITH parameters, make sure you select the corresponding built-in connector or upload the connector package.
    Without a matching connector package, the error org.apache.flink.table.api.ValidationException: Could not find any factory will occur when you run the job.
    If data is read from or written to Kafka, instead of selecting an old-version package such as flink-connector-kafka-0.11, we recommend you use flink-connector-kafka (without a version number) and set connector.version to universal to support the latest features.

    Calculated columns

    INSERT INTO ignores calculated columns in the sink. Assume that a sink is defined as follows. The SELECT clause following INSERT INTO MySink must include a (VARCHAR) and b (BIGINT) and cannot include the calculated column c.
    CREATE TABLE MySink (
    a VARCHAR,
    b BIGINT,
    c AS PROCTIME()
    ) WITH ( ... ... );

    Difference between tuple and upsert streams

    Make sure you specify the correct WITH parameters for the sink. For example, some connectors can only be used as sources and not sinks, and some only support tuple streams and not upsert streams.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support