tencent cloud

Feedback

MongoDB CDC

Last updated: 2023-11-08 14:54:50

    Overview

    The MongoDB CDC source connector automatically tracks MongoDB replica sets or sharded clusters to capture changes in databases and collections.

    Versions

    Flink Version
    Description
    1.11
    Unsupported
    1.13
    Supported
    1.14
    Supported
    1.16
    Supported

    Use cases

    The MongoDB CDC connector can be used only as a source. It supports MongoDB v4.0, v4.2, and v5.0, and the MongoDB cluster must be a replica set or a sharded cluster.

    Defining a table in DDL

    -- register a MongoDB table 'products' in Flink SQL
    CREATE TABLE mongo_cdc_source_table (
    _id STRING, // must be declared
    name STRING,
    weight DECIMAL(10,3),
    tags ARRAY<STRING>, -- array
    price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
    suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
    PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'inventory',
    'collection' = 'products'
    );

    WITH parameters

    Option
    Description
    Required
    Remarks
    connector
    The connector to use.
    Yes
    The value must be 'mysql-cdc'.
    hosts
    IP and port of the MongoDB database server.
    Yes
    -
    username
    Name of the database user to use when connecting to MongoDB.
    Yes
    -
    password
    Password of the database user to use when connecting to MongoDB.
    Yes
    -
    database
    The name of the MongoDB database to watch for changes.
    Yes
    -
    collection
    The name of the collection in the MongoDB database to watch for changes.
    Yes
    -
    connection.options
    The ampersand-separated Connection String Options of MongoDB, such as relicaSet=test&connectTimeoutMS=300000.
    No
    -
    errors.tolerance
    Whether to ignore error records. Valid values: none and all. If it is set to all, all error records will be ignored.
    No
    none
    errors.log.enable
    Whether to print errors in logs.
    No
    Default value: true.
    copy.existing
    Whether to copy the existing data in the database. If changes are made to the data during the copying, they will apply after the copying is completed.
    No
    Default value: true.
    copy.existing.pipeline
    This option allows setting filters for copying the existing data. For example, if you set it to [{"$match": {"closed": "false"}}], only records whose value of closed is false will be copied. For how to use this option, see $match (aggregation).
    No
    -
    copy.existing.max.threads
    The number of threads to use when copying data.
    No
    Default value: Processors Count.
    copy.existing.queue.size
    The maximum size of the queue to use when copying data.
    No
    Default value: 16000.
    poll.max.batch.size
    The maximum number of change stream documents to include in a single batch when polling for new data. By default, with a check interval of 1.5s, up to 1,000 documents can be included each time.
    No
    Default value: 1000.
    poll.await.time.ms
    The amount of time to wait before checking for new results on the change stream. By default, with a check interval of 1.5s, up to 1,000 documents can be included each time.
    No
    Default value: 1500.
    heartbeat.interval.ms
    The length of time in milliseconds between sending heartbeat messages. Set it to 0 to disable the feature.
    No
    Default value: 0.
    Note
    If data streams change slowly, we recommend you set heartbeat.interval.ms to an appropriate value. A resumeToken is included in a heartbeat message to avoid the use of an expired resumeToken when a Flink job resumes from checkpoint or savepoint.

    Data type mapping

    MongoDB Type
    Flink Type
    -
    TINYINT
    -
    SMALLINT
    Int
    INT
    Long
    BIGINT
    -
    FLOAT
    Double
    DOUBLE
    Decimal128
    DECIMAL(p, s)
    Boolean
    BOOLEAN
    DateTimestamp
    DATE
    DateTimestamp
    TIME
    Date
    TIMESTAMP(3) TIMESTAMP_LTZ(3)
    Timestamp
    TIMESTAMP(0) TIMESTAMP_LTZ(0)
    String
    ObjectId
    UUID
    Symbol
    MD5
    JavaScript
    Regex
    STRING
    BinData
    BYTES
    Object
    ROW
    Array
    ARRAY
    DBPointer
    ROW<$ref STRING, $id STRING>
    Point : ROW<type STRING, coordinates ARRAY<DOUBLE>>
    Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
    ...

    Example

    CREATE TABLE mongo_cdc_source_table (
    _id STRING, // must be declared
    name STRING,
    weight DECIMAL(10,3),
    tags ARRAY<STRING>, -- array
    price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
    suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
    PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'inventory',
    'collection' = 'products'
    );
    CREATE TABLE `print_table` (
    `id` STRING,
    `name` STRING,
    `currency` STRING
    ) WITH (
    'connector' = 'print'
    );
    insert into print_table select _id, name, price.currency from mongo_cdc_source_table;

    Notes

    User permissions

    The user of the MongoDB database must have the changeStream and read permissions.
    use admin;
    db.createUser(
    {
    user: "flinkuser",
    pwd: "flinkpw",
    roles: [
    { role: "read", db: "admin" },
    { role: "readAnyDatabase", db: "admin" }
    ]
    }
    );

    Parallelism

    The task parallelism must be 1.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support