tencent cloud

Feedback

Tencent Cloud Message Queue

Last updated: 2023-11-08 14:58:27

    Overview

    Cloud Message Queue (CMQ) is a distributed message queue system developed based on Tencent's in-house messaging engine. It can be used as a source or a sink. You can ingest stream data to a CMQ queue, process the data using Flink operators, and output the result to another CMQ queue on the same or a different instance.

    Versions

    Flink Version
    Description
    1.11
    Supported
    1.13
    Supported
    1.14
    Unsupported
    1.16
    Unsupported

    Limits

    CMQ can be used as a source or a sink for tuple streams. It does not support upsert streams currently.

    Defining a table in DDL

    As a source

    JSON format

    CREATE TABLE `cmq_source_json_table` (
    `id` INT,
    `name`STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- If you want to remove duplicates, specify the primary key, which is used to identify data.
    ) WITH (
    'connector' = 'cmq', -- Here, it should be 'cmq'.
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
    'queue' = 'queue_name', -- The name of the CMQ queue.
    'secret-id' = 'xxxx', -- The account secret ID.
    'secret-key' = 'xxxx', -- The account secret key.
    'sign-method' = 'HmacSHA1', -- The signature algorithm.
    'format' = 'json', -- The data format (JSON).
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    'batch-size' = '16', -- The number of messages consumed at a time.
    'request-timeout' = '5000ms', -- The request timeout period.
    'polling-wait-timeout'= '10s', -- The time to wait in case of failure to obtain any data.
    'key-alive-timeout'= '5min' -- The valid time period for the deduplication of CMQ messages with primary keys.
    );

    CSV format

    CREATE TABLE `cmq_source_csv_table` (
    `id` int,
    `name` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- If you want to remove duplicates, specify the primary key, which is used to identify data.
    ) WITH (
    'connector' = 'cmq', -- Here, it should be 'cmq'.
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
    'queue' = 'queue_name', -- The name of the CMQ queue.
    'secret-id' = 'xxxx', -- The account secret ID.
    'secret-key' = 'xxxx', -- The account secret key.
    'sign-method' = 'HmacSHA1', -- The signature algorithm.
    'format' = 'csv', -- The data format (CSV).
    'batch-size' = '16', -- The number of messages consumed/sent at a time.
    'request-timeout' = '5000ms', -- The request timeout period.
    'polling-wait-timeout'= '10s', -- The time to wait in case of failure to obtain any data.
    'key-alive-timeout'= '5min' -- The valid time period for the deduplication of CMQ messages with primary keys.
    );

    As a sink

    JSON format

    CREATE TABLE `cmq_sink_json_table` (
    `id` int,
    `name` STRING
    ) WITH (
    'connector' = 'cmq', -- Here, it should be 'cmq'.
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
    'queue' = 'queue_name', -- The name of the CMQ queue.
    'secret-id' = 'xxxx', -- The account secret ID.
    'secret-key' = 'xxxx', -- The account secret key.
    'sign-method' = 'HmacSHA1', -- The signature algorithm.
    'format' = 'json', -- The data format (JSON).
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    'batch-size' = '16', -- The number of messages sent at a time.
    'request-timeout' = '5000ms', -- The request timeout period.
    'retry-times' = '3', -- The number of retries in case of failure to send a message.
    'max-block-timeout' = '0s' -- The maximum time to wait to batch send data.
    );

    CSV format

    CREATE TABLE `cmq_sink_csv_table` (
    `id` int,
    `name` STRING
    ) WITH (
    'connector' = 'cmq', -- Here, it should be 'cmq'.
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
    'queue' = 'queue_name', -- The name of the CMQ queue.
    'secret-id' = 'xxxx', -- The account secret ID.
    'secret-key' = 'xxxx', -- The account secret key.
    'sign-method' = 'HmacSHA1', -- The signature algorithm.
    'format' = 'csv', -- The data format (CSV).
    'batch-size' = '16', -- The number of messages sent at a time.
    'request-timeout' = '5000ms', -- The request timeout period.
    'retry-times' = '3', -- The number of retries in case of failure to send a message.
    'max-block-timeout' = '0s' -- The maximum time to wait to batch send data.
    );

    WITH parameters

    Common WITH parameters

    Option
    Required
    Default Value
    Description
    connector
    Yes
    -
    Here, it should be 'cmq'.
    hosts
    Yes
    -
    The name server of the queue's region. For details, see TCP SDK.
    queue
    Yes
    -
    The CMQ queue name.
    secret-id
    Yes
    -
    The account secret ID.
    secret-key
    Yes
    -
    The account secret key.
    sign-method
    No
    HmacSHA1
    The signature algorithm.
    format
    Yes
    -
    The input and output format of CMQ messages. Valid values include 'csv' and 'json'.
    batch-size
    No
    16
    The number of messages sent/received at a time.
    request-timeout
    No
    5000ms
    The request timeout period.
    polling-wait-timeout
    No
    10s
    The time to wait in case of failure to obtain any data.
    key-alive-timeout
    No
    60s
    The valid time period for the deduplication of CMQ messages with primary keys. This option ensures that the same message is consumed only once, but does not guarantee global uniqueness.
    retry-times
    No
    3
    The number of retries in case of failure to send a message.
    max-block-timeout
    No
    0s
    The maximum time to wait to batch send data. If it is '0s', data will be sent immediately without waiting.

    WITH parameters for JSON

    Option
    Required
    Default Value
    Description
    json.fail-on-missing-field
    No
    false
    If this is true, the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed.
    json.ignore-parse-errors
    No
    false
    If this is true, when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false, the job will fail in case of a parse error.
    json.timestamp-format.standard
    No
    SQL
    The JSON timestamp format. The default value is SQL, in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision}. You can also set it to ISO-8601, and the format will be yyyy-MM-ddTHH:mm:ss.s{precision}.

    WITH parameters for CSV

    Option
    Required
    Default Value
    Description
    csv.field-delimiter
    No
    ,
    The field delimiter, which is comma by default.
    csv.line-delimiter
    No
    U&'\\000A'
    The line delimiter, which is \\n by default (in SQL, you must use U&'\\000A'). You can also set it to \\r (in SQL, you need to use U&'\\000D').
    csv.disable-quote-character
    No
    false
    Whether to disable quote characters. If this is true, 'csv.quote-character' cannot be used.
    csv.quote-character
    No
    ''
    The quote characters. Text inside quotes will be viewed as a whole. The default value is ''.
    csv.ignore-parse-errors
    No
    false
    Whether to ignore parse errors. If this is true, fields will be set to null in case of parse failure.
    csv.allow-comments
    No
    false
    Whether to ignore comment lines that start with # and output them as empty lines (if this is true, make sure you set csv.ignore-parse-errors to true as well).
    csv.array-element-delimiter
    No
    ;
    The array element delimiter, which is ; by default.
    csv.escape-character
    No
    -
    The escape character. By default, escape characters are disabled.
    csv.null-literal
    No
    -
    The string that will be seen as null.

    Example

    CREATE TABLE `cmq_source_json_table` (
    `id` int,
    `name` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- If you want to remove duplicates, specify the primary key, which is used to identify data.
    ) WITH (
    'connector' = 'cmq', -- Here, it should be 'cmq'.
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
    'queue' = 'queue_name', -- The name of the CMQ queue.
    'secret-id' = 'xxxx', -- The account secret ID.
    'secret-key' = 'xxxx', -- The account secret key.
    'sign-method' = 'HmacSHA1', -- The signature algorithm.
    'format' = 'json', -- The data format (JSON).
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    'batch-size' = '16', -- The number of messages consumed at a time.
    'request-timeout' = '5000ms', -- The request timeout period.
    'polling-wait-timeout'= '10s', -- The time to wait in case of failure to obtain any data.
    'key-alive-timeout'= '5min' -- The valid time period for the deduplication of CMQ messages with primary keys.
    );
    CREATE TABLE `cmq_sink_json_table` (
    `id` int,
    `name` STRING
    ) WITH (
    'connector' = 'cmq', -- Here, it should be 'cmq'.
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', -- The name server of the CMQ queue's region.
    'queue' = 'queue_name', -- The name of the CMQ queue.
    'secret-id' = 'xxxx', -- The account secret ID.
    'secret-key' = 'xxxx', -- The account secret key.
    'sign-method' = 'HmacSHA1', -- The signature algorithm.
    'format' = 'json', -- The data format (JSON).
    'json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.
    'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.
    'batch-size' = '16', -- The number of messages sent at a time.
    'request-timeout' = '5000ms', -- The request timeout period.
    'retry-times' = '3', -- The number of retries in case of failure to send a message.
    'max-block-timeout' = '0s' -- The maximum time to wait to batch send data.
    );
    insert into cmq_sink_json_table select * from cmq_source_json_table;

    Notes

    Please pay attention to the following when using CMQ as a source:
    1. You can configure the primary key to achieve deduplication over a time period you specify. The longer this time period is, the higher the memory usage.
    2. We strongly recommend you set the visibility timeout period of CMQ messages to a value larger than the checkpoint interval for your Flink job to prevent consumed messages from being consumed again.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support