tencent cloud

Feedback

Testing Connectors

Last updated: 2024-04-19 12:23:54

    Testing sources and sinks

    If you want to check whether a job can run successfully or whether the logic is correct, you can use connectors specially designed for testing. This saves you the need to deploy external systems and can reduce interferences.

    Versions

    Flink Version
    Description
    1.11
    Supported
    1.13
    Supported
    1.14
    Supported
    1.16
    Supported

    Datagen source

    Datagen is a random data generator built into Flink. It can be used directly as a data source. For details, see the Flink document.
    Below is an example of a Datagen source. It generates two fields: id and name. id is a random number, and name is a random string.

    Defining a table in DDL

    CREATE TABLE datagen_source_table (
    id INT,
    name STRING
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1' -- The number of data records generated per second.
    );

    WITH parameters

    Option
    Required
    Default Value
    Data Type
    Description
    connector
    Yes
    -
    String
    The connector to use. Here, it should be datagen.
    rows-per-second
    No
    10000
    Long
    The number of rows generated per second. This determines the data send rate.
    fields.#.kind
    No
    random
    String
    The value generator for the # field. Valid values include sequence and random.
    fields.#.min
    No
    (Minimum value of type)
    (Type of field)
    The minimum value the random generator can generate. This is used for numeric data types.
    fields.#.max
    No
    (Maximum value of type)
    (Type of field)
    The maximum value the random generator can generate. This is used for numeric data types.
    fields.#.length
    No
    100
    Integer
    The length of strings generated by the random generator. This is used for data types including char, varchar, and string.
    fields.#.start
    No
    -
    (Type of field)
    The start value of the sequence generator.
    fields.#.end
    No
    -
    (Type of field)
    The end value of the sequence generator.

    Logger sink

    The Logger sink is a custom logger example provided by Stream Compute Service. It can write final data to the log file of TaskManager so that you can view the log output via Flink UI or the logging section of the Stream Compute Service console.
    1. To use the Logger sink, download the JAR package. If you want to customize the output logic, you can modify the package and rebuild it.
    2. Upload the package to the Stream Compute Service console. For details, see Managing Dependencies.
    3. In your SQL job, reference the package.

    Defining a table in DDL

    CREATE TABLE logger_sink_table (
    id INT,
    name STRING
    ) WITH (
    'connector' = 'logger',
    'print-identifier' = 'DebugData'
    );

    WITH parameters

    Option
    Required
    Data Type
    Description
    connector
    Yes
    String
    The connector to use. Here, it should be logger.
    print-identifier
    No
    String
    The prefix for log printing.
    all-changelog-mode
    No
    Boolean
    If you enable this, -U data will not be filtered out. This allows you to simulate ClickHouse Collapsing data streams.
    records-per-second
    No
    Integer
    The number of records output per second. You can use this to achieve throttling.
    mute-output
    No
    Boolean
    Whether to drop all output data and only calculate the number of records (an enhanced version of a BlackHole sink).

    Monitoring metrics

    Stream Compute Service offers a series of handy metrics for the Logger sink. Click the Logger sink operator in the execution graph in Flink UI a‌nd search for one of the metrics:
    numberOfInsertRecords: The number of output +I messages.
    numberOfDeleteRecords: The number of output -D messages.
    numberOfUpdateBeforeRecords: The number of output -U messages.
    numberOfUpdateAfterRecords: The number of output +U messages.
    Flink is built in with the standard-output Print sink. However, because the printing format does not comply with the rules of Stream Compute Service's log collector, the data is not well displayed in the console. We recommend you use the Logger sink instead.

    Defining a table in DDL

    CREATE TABLE `print_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    'connector' = 'print'
    );

    WITH parameters

    Option
    Required
    Default Value
    Data Type
    Description
    connector
    Yes
    -
    String
    The connector to use. Here, it should be print.
    print-identifier
    No
    -
    String
    The identifier (prefix) for the output data.
    standard-error
    No
    false
    Boolean
    Whether to print as standard error instead of standard output. To print as standard error, set this to True.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support