tencent cloud

Feedback

Consuming Subscribed Data with Flink Client (Avro)

Last updated: 2023-02-06 16:32:41

    Overview

    In data subscription (Kafka Edition, where the current Kafka Server version is 2.6.0), subscribed data in Avro format can be consumed by using a Flink client (only the DataStream API type). This document provides a demo for data consumption with flink-dts-connector.
    Notes
    Currently, data consumption over the Avro protocol is supported only for TencentDB for MySQL and TDSQL-C for MySQL.

    Prerequisites

    1. You have created a data consumption task as instructed in Creating Data Subscription Task.
    2. You have created a consumer group as instructed in Adding Consumer Group.
    3. You have installed Flink, and it can execute tasks normally.

    Notes

    The demo only prints out the consumed data and does not contain any usage instructions. You need to write your own data processing logic based on the demo. You can also use Kafka clients in other languages to consume and parse data.
    Currently, data subscription to Kafka for consumption can be implemented over the Tencent Cloud private network but not the public network. In addition, the subscribed database instance and the data consumer must be in the same region.
    The Kafka built in DTS has a certain upper limit for processing individual messages. When a single row of data in the source database exceeds 10 MB, this row may be discarded.

    Downloading a consumption demo

    Demo Language
    TencentDB for MySQL and TDSQL-C MySQL
    Java
    Compilation environment: Maven or Gradle and JDK 8. You can choose a desired package management tool. The following takes Maven as an example. Runtime environment: Tencent Cloud CVM (which can access the private network address of the Kafka server only if it is in the same region as the subscribed instance). Install JRE 8. Directions:
    1. Download the Java Flink demo and decompress it.
    2. Access the decompressed directory. Maven model and pom.xml files are placed in the directory for your use as needed. java -jar avro-tools-1.8.2.jar compile -string schema Record.avsc: Code generation path.
    3. Modify the Flink version in the pom.xml file. The version in the following code must be the same as the Flink version you use.
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>1.13.6</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>1.13.6</version>
    <scope>provided</scope>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro</artifactId>
    <version>1.13.6</version>
    </dependency>
    4. Go to the directory where the pom file is located and package it with Maven or IEDA Package with Maven by running mvn clean package.
    5. For scenarios where the Flink client type is DataStream API, use Flink client commands to submit the task and start consumption. ./bin/flink run consumerDemo-avro-flink-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --user xxx --password xxx —trans2sql
    broker is the private network access address for data subscription to Kafka, and topic is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.
    group, user, and password are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.
    trans2sql indicates whether to enable conversion to SQL statement. In Java code, if this parameter is carried, the conversion will be enabled.
    6. Observe consumption. View running tasks.
    
    View task details.
    

    Key demo logic description

    Files in the demo are as described below:
    consumerDemo-avro-flink\\src\\main\\resources\\avro-tools-1.8.2.jar: The tool used to generate Avro protocol code.
    consumerDemo-avro-flink\\src\\main\\java\\com\\tencent\\subscribe\\avro: The directory where the Avro tool generates code.
    consumerDemo-avro-flink\\src\\main\\resources\\Record.avsc: The protocol definition file.
    14 structures (called schemas in Avro) are defined in Record.avsc. The main data structure is record, which is used to represent a data record in binlog. The record structure is as follows. Other data structures can be viewed in Record.avsc.
    {
    "namespace": "com.tencent.subscribe.avro", // The last schema in `Record.avsc`, with `name` displayed as `Record`.
    "type": "record",
    "name": "Record", // `name` is displayed as `Record`, indicating the format of the data consumed from Kafka.
    "fields": [
    {
    "name": "id", // `id` indicates a globally incremental ID. More record values are explained as follows:
    "type": "long",
    "doc": "unique id of this record in the whole stream"
    },
    {
    "name": "version", // `version` indicates the protocol version.
    "type": "int",
    "doc": "protocol version"
    },
    {
    "name": "messageType", // Message type
    "aliases": [
    "operation"
    ],
    "type": {
    "namespace": "com.tencent.subscribe.avro",
    "name": "MessageType",
    "type": "enum",
    "symbols": [
    "INSERT",
    "UPDATE",
    "DELETE",
    "DDL",
    "BEGIN",
    "COMMIT",
    "HEARTBEAT",
    "CHECKPOINT",
    "ROLLBACK"
    ]
    }
    },
    {
    ......
    },
    }
    Fields in a record are as explained below:
    
    Field
    Description
    id
    The globally incremental ID.
    version
    The protocol version, which is v1 currently.
    messageType
    The message type. Enumerated values: "INSERT", "UPDATE", "DELETE", "DDL", "BEGIN", "COMMIT", "HEARTBEAT", "CHECKPOINT".
    fileName
    The name of the binlog file where the current record is located.
    position
    The end offset of the current record in the binlog in the format of End_log_pos@binlog file number. For example, if the current record is in file mysql-bin.000004 and the end offset is 2196, then the value of this parameter will be 2196@4.
    safePosition
    The start offset of the current transaction in the binlog, which is in the same format as described above.
    timestamp
    The time when the data was written to the binlog, which is a UNIX timestamp in seconds.
    gtid
    The current GTID, such as c7c98333-6006-11ed-bfc9-b8cef6e1a231:9.
    transactionId
    The transaction ID, which is generated only for COMMIT events.
    serverId
    The server ID of the source database, which can be viewed by running SHOW VARIABLES LIKE 'server_id'.
    threadId
    The ID of the session that committed the current transaction, which can be viewed by running SHOW processlist;.
    sourceType
    The source database type, which currently can only be MySQL.
    sourceVersion
    The source database version, which can be viewed by running: select version();.
    schemaName
    The database name.
    tableName
    The table name.
    objectName
    Format: Database name.table name.
    columns
    The definitions of columns in the table.
    oldColumns
    The data of the row before DML execution. If the message is an INSERT message, the array will be null. There are 12 types of elements in the array, i.e., Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, and EmptyObject. For more information, see the definitions in the demo.
    newColumns
    The data of the row after DML execution. If the message is a DELETE message, the array will be null. There are 12 types of elements in the array, i.e., Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, and EmptyObject. For more information, see the definitions in the demo.
    sql
    The DDL SQL statement.
    executionTime
    The DDL execution duration in seconds.
    heartbeatTimestamp
    The timestamp of the heartbeat message in seconds, which is present only for heartbeat messages.
    syncedGtid
    The collection of GTIDs parsed by DTS in the format of c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13.
    fakeGtid
    Whether the current GTID is forged. If gtid_mode is not enabled, DTS will forge a GTID.
    pkNames
    If the table in the source database has a primary key, this parameter will be carried in the DML message; otherwise, it will not be carried.
    readerTimestamp
    The time when DTS processed the current data record, which is a UNIX timestamp in milliseconds.
    tags
    The status_vars in QueryEvent. For more information, see binary_log::Query_event Class Reference.
    total
    The total number of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension.
    index
    The index of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension.
    The field describing column attributes in a record is Field, including the following four attributes:
    name: The column name.
    dataTypeNumber: The type of the data recorded in the binlog. For values, see COM_QUERY Response.
    isKey: Whether the current key is the primary key.
    originalType: The type defined in DDL.

    Database field mappings

    The following lists the mappings between database (such as MySQL) field types and data types defined in the Avro protocol.
    Type in MySQL
    Corresponding Type in Avro
    MYSQL_TYPE_NULL
    EmptyObject
    MYSQL_TYPE_INT8
    Integer
    MYSQL_TYPE_INT16
    Integer
    MYSQL_TYPE_INT24
    Integer
    MYSQL_TYPE_INT32
    Integer
    MYSQL_TYPE_INT64
    Integer
    MYSQL_TYPE_BIT
    Integer
    MYSQL_TYPE_YEAR
    DateTime
    MYSQL_TYPE_FLOAT
    Float
    MYSQL_TYPE_DOUBLE
    Float
    MYSQL_TYPE_VARCHAR
    Character
    MYSQL_TYPE_STRING
    Character. If the original type is binary, this type will correspond to BinaryObject.
    MYSQL_TYPE_VAR_STRING
    Character. If the original type is varbinary, this type will correspond to BinaryObject.
    MYSQL_TYPE_TIMESTAMP
    Timestamp
    MYSQL_TYPE_DATE
    DateTime
    MYSQL_TYPE_TIME
    DateTime
    MYSQL_TYPE_DATETIME
    DateTime
    MYSQL_TYPE_TIMESTAMP_NEW
    Timestamp
    MYSQL_TYPE_DATE_NEW
    DateTime
    MYSQL_TYPE_TIME_NEW
    DateTime
    MYSQL_TYPE_DATETIME_NEW
    DateTime
    MYSQL_TYPE_ENUM
    TextObject
    MYSQL_TYPE_SET
    TextObject
    MYSQL_TYPE_DECIMAL
    Decimal
    MYSQL_TYPE_DECIMAL_NEW
    Decimal
    MYSQL_TYPE_JSON
    TextObject
    MYSQL_TYPE_BLOB
    BinaryObject
    MYSQL_TYPE_TINY_BLOB
    BinaryObject
    MYSQL_TYPE_MEDIUM_BLOB
    BinaryObject
    MYSQL_TYPE_LONG_BLOB
    BinaryObject
    MYSQL_TYPE_GEOMETRY
    BinaryObject
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support