tencent cloud

Feedback

Last updated: 2023-11-08 14:51:17

    Overview

    The ‌Flink connector mongodb allows batch data writing from Flink to MongoDB. It currently supports only append (tuple) streams.

    Versions

    Flink Version
    Description
    1.11
    Unsupported
    1.13
    Supported
    1.14
    Unsupported
    1.16
    Supported

    Defining a table in DDL

    CREATE TABLE mongodb (
    user_id INT,
    item_id INT,
    category_id INT,
    behavior VARCHAR
    ) WITH (
    'connector' = 'mongodb', -- Here, it should be 'mongodb'.
    'database' = 'test', -- The name of the database.
    'collection' = 'table1',-- The data collection.
    'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- The MongoDB connection string.
    'batchSize' = '1024' -- The number of records written per batch.
    );

    Limits

    The Flink connector mongodb can be used as a MongoDB sink only. It supports using a TencentDB for MongoDB table as a result table.

    WITH parameters

    Option
    Description
    Required
    Remarks
    connector
    The result table type.
    Yes
    Here, it should be mongodb.
    database
    The name of the database.
    Yes
    -
    collection
    The data collection.
    Yes
    -
    uri
    The MongoDB connection string.
    Yes
    -
    batchSize
    The number of records written per batch.
    No
    Default value: 1024.
    maxConnectionIdleTime
    The connection timeout period.
    No
    Default value: 60000 (ms).

    Example

    CREATE TABLE random_source (
    user_id INT,
    item_id INT,
    category_id INT,
    behavior VARCHAR
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100', -- The number of records generated per second.
    'fields.user_id.kind' = 'sequence', -- Whether a bounded sequence (if yes, the output automatically stops after the sequence ends).
    'fields.user_id.start' = '1', -- The start value of the sequence.
    'fields.user_id.end' = '10000', -- The end value of the sequence.
    'fields.item_id.kind' = 'random', -- A random number without range.
    'fields.item_id.min' = '1', -- ‍The minimum random number.
    'fields.item_id.max' = '1000', -- The maximum random number.
    'fields.category_id.kind' = 'random', -- A random number without range.
    'fields.category_id.min' = '1', -- The minimum random number.
    'fields.category_id.max' = '1000', -- The maximum random number.
    'fields.behavior.length' = '5' -- The random string length.
    );
    
    CREATE TABLE mongodb (
    user_id INT,
    item_id INT,
    category_id INT,
    behavior VARCHAR
    ) WITH (
    'connector' = 'mongodb', -- Here, it should be 'mongodb'.
    'database' = 'test', -- The name of the database.
    'collection' = 'table1',-- The data collection.
    'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- The MongoDB connection string.
    'batchSize' = '1024' -- The number of records written per batch.
    );
    
    insert into mongodb select * from random_source;

    Notes

    Upsert

    ‌The Flink connector mongodb as a sink does not support upsert streams.

    User permissions

    The user of the MongoDB database must have the permission to write data to the database.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support