tencent cloud

Feedback

CREATE TABLE

Last updated: 2023-11-08 15:28:58
    The CREATE TABLE statement describes a data source or a sink and defines it as a data table.

    Syntax

    Syntax structure

    CREATE TABLE Table name
    (
    { <Column definition> | <Calculated column definition> }[ , ...n]
    [ <Watermark definition> ]
    [ <Definitions of table constraints such as the primary key> ][ , ...n]
    )
    [COMMENTTable comments]
    [PARTITIONED BY (grouped column name 1, grouped column name 2, ...)]
    WITH (key 1=value 1, key 2=value 2, ...)
    [ LIKE another table [( <LIKE clause> )] ]

    Symbols

    A table created by CREATE TABLE can be used as a data source or a data sink. However, there must be a corresponding connector; otherwise, an error will occur.
    <Column definition>:
    Column name Column type [ <Definitions of column constraints> ] [COMMENT Column ‍comments]
    
    <Definitions of column constraints>:
    [CONSTRAINT constraint name] PRIMARY KEY NOT ENFORCED
    
    <Definitions of table constraints>:
    [CONSTRAINT constraint name] PRIMARY KEY (column 1, column 2, ...) NOT ENFORCED
    
    <Calculated column definition>:
    Column name AS calculated column expression [COMMENT column comments]
    
    <Watermark definition>:
    WATERMARK FOR a ROWTIME column AS a watermark generation expression
    
    <LIKE clause>:
    {
    { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
    | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
    }[, ...]

    Clauses

    Calculated column

    ‍A calculated column is a virtual column. It is logically defined but does not actually exist in the data source. Calculated columns are usually the result of calculation based on other columns, constants, variables, or functions in the same table. For example, if your data source has "price" and "quantity", you can define a calculated column "cost" using cost AS price * quantity so that you can query cost data from the table.
    A more common use of calculated columns is timestamp standardization. Assume that a data source has a timestamp field mytime, which is in Unix format (such as 1599469771494). You can use ts AS TO_TIMESTAMP(FROM_UNIXTIME(mytime / 1000, 'yyyy-MM-dd HH:mm:ss')) to convert the timestamp to Timestamp(3) so that it can be recognized by Flink. There may be cases where, although the timestamp is in Timestamp(3) format, it is embedded in another JSON field. ‍In such cases, you can also use calculated columns to parse the field and get the timestamp.
    Note
    Calculated columns can only be used in the SELECT statement.
    With INSERT, calculated columns in the sink will be ignored.

    Watermark

    Watermark definition

    ‍Watermark determines the time mode (for details, see "Event Time/Processing Time" below) of a Flink job. Here is how you define a watermark:
    WATERMARK FOR a ROWTIME column AS a watermark generation expression
    For example, WATERMARK FOR my_time_field AS my_time_field - INTERVAL '3' SECOND defines a watermark policy with a tolerance of 3 seconds.
    ‍The ROWTIME column must be in Timestamp(3) format so that it can be recognized by Flink. Embedded fields are not supported. If the column is not in the required format or is an embedded field, create a virtual calculated column using the method described above.
    The watermark generation expression determines how a watermark is generated. ‍It describes a value in Timestamp(3) format.
    Below is an example:
    CREATE TABLE StudentRecord (
    Id BIGINT,
    StudentName STRING,
    RegistrationTime TIMESTAMP(3),
    WATERMARK FOR RegistrationTime AS RegistrationTime - INTERVAL '3' MINUTE
    ) WITH ( ... ... );

    Watermark generation policy

    ‍Watermark policy for monotonically increasing timestamp
    ‍If your timestamp is monotonically increasing, to keep the data latency as low as possible, you can use the following statement.
    WATERMARK FOR A ROWTIME column AS A ROWTIME column
    The following statement uses the largest timestamp in each input data record as the watermark. If the timestamp is out of order, data that arrives late will be discarded due to failure to meet the watermark threshold. Example:
    WATERMARK FOR my_time AS my_time
    Watermark policy with out-of-order tolerance
    This kind of policy has a certain degree of out-of-order tolerance. ‍You can define your own tolerance. Note that a large tolerance may result in a high data latency (backlog, waiting), and a small tolerance may cause data that exceeds the threshold to be dropped, leading to inaccurate results.
    WATERMARK FOR a ROWTIME column AS a ROWTIME column - INTERVAL 'Time window' Time unit
    ‍The f‍ollowing statement uses the largest timestamp in each input data record minus the 3-second tolerance window as the watermark. ‍In case of out-of-order data, if the difference between the data arrival time and the largest timestamp is within 3 seconds, the data will be counted in calculation.
    WATERMARK FOR my_time AS my_time - INTERVAL '3' SECOND

    Event Time/Processing Time

    For window-based operations (such as the specification of time ranges by GROUP BY, OVER, and JOIN), Stream Compute Service supports two time modes: Event Time and ‍Processing Time.
    Event Time uses timestamp information in the input data and has a certain degree of out-of-order tolerance (for example, it can tolerate late arrivals caused by reasons such as network fluctuations and the varying processing capabilities of different nodes). You can specify the tolerance (in milliseconds) using the second parameter of BOUNDED. This mode is more accurate, but to use it, you must include timestamp information in the input data. Currently, this mode is only supported for timestamp-type fields in the source. In the future, we will add support for virtual columns, which you can use to convert columns of other data types to timestamp data that the system can recognize.
    With the Processing Time mode, the input data does not need to include timestamp information. ‍The processing time is automatically added to the data and is assigned to the field PROCTIME (all capitals). This column is hidden a‍nd will not be read by SELECT *. It is read only if you specify it manually.
    Note
    You need to use the same time mode for all data sources of the same task. ‍If Event Time is used, you must define the timestamp and declare a watermark field for all table sources defined.

    PRIMARY KEY

    When defining a table or view, you can declare some fields as the primary key so that their values will not be duplicate or null (equivalent to NOT NULL + UNIQUE in SQL).
    You can define columns as the primary key or use the CONSTRAINT clause. You cannot ‍define different primary keys for the same table. Flink does not guarantee the uniqueness of the primary key of each data record, so only the PRIMARY KEY NOT ENFORCED ‍clause is supported. This reminds users that they need to ensure the primary keys are defined as required.

    PARTITIONED BY

    If a PARTITIONED BY clause is used for a column, it indicates that Flink can partition the column. This mainly affects the FileSystem sink, which creates a separate catalog for different data partition.
    We don't recommend you use the FileSystem sink because the data of the file system will be automatically cleared after all TaskManager tasks are executed.

    WITH parameters

    WITH parameters are usually used to specify connector parameters for the source and sink. The syntax is 'key1'='value1', 'key2' = 'value2'.
    For example, to use Kafka (Tencent Cloud ‍or self-built Kafka), you need to specify parameters such as the server address, the topic, and the consumption start time.
    For how to use ‍some of the common connectors, see Connectors.

    LIKE clause

    When creating a table (table B), you can use the LIKE clause to use the structure of another table (table A). This can significantly reduce the code size of CREATE TABLE. Assume that you want to write the same data to a Kafka sink, an Elasticsearch sink, and a MySQL sink. You can use the LIKE clause to define three tables where the column definition is reused.
    Define table A:
    CREATE TABLE A (
    Id BIGINT,
    StudentName STRING,
    RegistrationTime TIMESTAMP(3)
    ) WITH ( ... Some parameters ... );
    Use table A to define table B (with watermark):
    CREATE TABLE B (
    WATERMARK FOR RegistrationTime AS RegistrationTime - INTERVAL '3' MINUTE
    ) WITH ( ... Some other parameters ... ) LIKE `A`;
    By default, the LIKE clause and WITH ‍parameters are unrelated, so you can use different WITH parameters for the two tables. To reuse the WITH parameters of table A, use LIKE clause options.

    LIKE clause options

    The LIKE clause offers the following options which you can use to specify the content of table A to reuse:
    CONSTRAINTS: Primary key and other constraints
    GENERATED: Calculated column
    OPTIONS: WITH parameters
    PARTITIONS: PARTITIONED BY definition
    WATERMARKS: WATERMARK FOR definition
    ALL: All above
    Flink provides three merging strategies:
    INCLUDING: Table B will inherit all specified properties of table A. If table A and table B have conflicting definitions (for example, if they both have definitions for the same field), an error will occur.
    EXCLUDING: Table B will not include the specified properties of table A.
    OVERWRITING: Table B will inherit all specified properties of table A. If table A and table B have conflicting definitions, table B's definitions will overwrite table A's.
    If LIKE clause options are not specified, INCLUDING ALL OVERWRITING OPTIONS will be executed. This means table B will inherit all definitions and settings of table A, but will overwrite table A's WITH parameters.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support