tencent cloud

文档反馈

使用 Flink 客户端消费订阅数据(Avro)

最后更新时间:2023-02-06 16:32:58

    操作场景

    数据订阅 Kafka 版(当前 Kafka Server 版本为V2.6.0)中,针对 Avro 格式的订阅数据,可以使用 Flink 客户端(仅支持客户端类型为 DataStream API)进行消费,本场景为您提供使用 flink-dts-connector 进行数据消费的 Demo 示例。
    说明
    当前仅 MySQL、TDSQL-C MySQL 支持 Avro 协议的数据消费。

    前提条件

    2. 创建消费组
    3. 已安装 Flink 并能够正常执行 Flink 任务。

    注意事项

    Demo 并不包含消费数据的用法演示,仅对数据做了打印处理,您需要在此基础上自行编写数据处理逻辑,您也可以使用其他语言的 Kafka 客户端消费并解析数据。
    目前不支持通过外网连接数据订阅的 Kafka 进行消费,只支持腾讯云内网的访问,并且订阅的数据库实例所属地域与数据消费的地域相同。
    DTS 中内置的 Kafka 处理单条消息有一定上限,当源库中的单行数据超过10MB时,这行数据有可能会被丢弃。

    消费 Demo 下载

    Demo 语言
    云数据库 MySQL、TDSQL-C MySQL
    Java
    编译环境:Maven 或者 Gradle 包管理工具,JDK8。用户可自行选择打包工具,如下以 Maven 为例进行介绍。 运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址),安装 JRE8。 操作步骤:
    1. 下载 Java Flink Demo,然后解压该文件。
    2. 进入解压后的目录,为方便使用,目录下分别放置了 Maven 模型文件、pom.xml 文件,用户根据需要选用。 java -jar avro-tools-1.8.2.jar compile -string schema Record.avsc :代码生成路径。
    3. 在 pom.xml 文件中修改 Flink 的版本,如下代码中的 version 需要与客户使用的 Flink 版本保持一致。
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>1.13.6</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>1.13.6</version>
    <scope>provided</scope>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro</artifactId>
    <version>1.13.6</version>
    </dependency>
    4. 进入 pom 文件所在的目录,使用 Maven 进行打包,也直接使用 IEDA 打包。 使用 Maven 进行打包:mvn clean package。
    5. 针对 Flink 客户端类型为 DataStream API 的场景,使用 Flink 客户端命令提交任务,启动消费。 ./bin/flink run consumerDemo-avro-flink-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --user xxx --password xxx —trans2sql
    broker 为数据订阅 Kafka 的内网访问地址,topic 为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看。
    groupuserpassword 分别为消费组的名称、账号和密码,可在 消费管理 页查看。
    trans2sql 表示是否转换为 SQL 语句,java 代码中,携带该参数表示转换为 SQL 语句,不携带则不转换。
    6. 观察消费情况。 查看正在运行的任务。
    
    查看任务详情。
    

    Demo 关键逻辑讲解

    Demo 中的文件说明如下。
    consumerDemo-avro-flink\\src\\main\\resources\\avro-tools-1.8.2.jar:是用来生成 Avro 协议相关代码的工具。
    consumerDemo-avro-flink\\src\\main\\java\\com\\tencent\\subscribe\\avro:Avro 工具生成代码的目录。
    consumerDemo-avro-flink\\src\\main\\resources\\Record.avsc:协议定义文件。
    Record.avsc 中我们定义了14个结构(Avro 中叫做 schema),其中主要的数据结构为 Record,用于表示 binlog 中的一条数据,Record 的结构如下,其他数据结构可以在 Record.avsc 中查看:
    {
    "namespace": "com.tencent.subscribe.avro", //Record.avsc 中的最后1个 schema,"name" 显示为 "Record"
    "type": "record",
    "name": "Record", //"name" 显示为 "Record",表示从 kafka 中消费的数据格式
    "fields": [
    {
    "name": "id", //id 表示全局递增 ID,更多 record 取值解释如下表
    "type": "long",
    "doc": "unique id of this record in the whole stream"
    },
    {
    "name": "version", //version 表示协议版本
    "type": "int",
    "doc": "protocol version"
    },
    {
    "name": "messageType", //消息类型
    "aliases": [
    "operation"
    ],
    "type": {
    "namespace": "com.tencent.subscribe.avro",
    "name": "MessageType",
    "type": "enum",
    "symbols": [
    "INSERT",
    "UPDATE",
    "DELETE",
    "DDL",
    "BEGIN",
    "COMMIT",
    "HEARTBEAT",
    "CHECKPOINT",
    "ROLLBACK"
    ]
    }
    },
    {
    ……
    },
    }
    Record 中的字段类型解释如下:
    
    Record 中的字段名称
    说明
    id
    全局递增 ID。
    version
    协议版本,当前版本为1。
    messageType
    消息类型,枚举值:"INSERT","UPDATE","DELETE","DDL","BEGIN","COMMIT","HEARTBEAT","CHECKPOINT"。
    fileName
    当前 record 所在的 binlog 文件名。
    position
    当前 record 的在 binlog 中结束的偏移量,格式为 End_log_pos@binlog 文件编号。比如,当前 record 位于文件 mysql-bin.000004 中,结束偏移量为2196,则其值为"2196@4"。
    safePosition
    当前事务在binlog中开始的偏移量,格式同上。
    timestamp
    写入binlog的时间,unix时间戳,秒级。
    gtid
    当前的 gtid,如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:9。
    transactionId
    事务 ID,只有 commit 事件才会生成事务ID。
    serverId
    源库 serverId,查看源库 server_id 参考 SHOW VARIABLES LIKE 'server_id'。
    threadId
    提交当前事务的会话 ID,参考 SHOW processlist;。
    sourceType
    源库的数据库类型,当前版本只有 MySQL。
    sourceVersion
    源库版本,查看源库版本参考 select version();
    schemaName
    库名。
    tableName
    表名。
    objectName
    格式为:库名.表名。
    columns
    表中各列的定义。
    oldColumns
    DML 执行前该行的数据,如果是insert消息,该数组为null。数组中元素共有12种类型:Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, EmptyObject,详见demo中定义。
    newColumns
    DML 执行后该行的数据,如果是delete消息,该数组为null。数组中元素共有12种类型:Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, EmptyObject,详见demo中定义。
    sql
    DDL 的 SQL 语句。
    executionTime
    DDL 执行时长,单位为秒。
    heartbeatTimestamp
    心跳消息的时间戳,秒级。只有 heartbeat 消息才有该字段。
    syncedGtid
    DTS 已解析 GTID 集合,格式形如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13。
    fakeGtid
    是否为构造的假 GTID,如未开启 gtid_mode,则 DTS 会构造一个 GTID。
    pkNames
    如果源库的表设有主键,则 DML 消息中会携带该参数,否则不会携带。
    readerTimestamp
    DTS 处理这条数据的时间,unix 时间戳,单位为毫秒数。
    tags
    QueryEvent 中的 status_vars,详细参考 QueryEvent
    total
    如果消息分片,记录分片总数。当前版本 (version=1) 无意义,预留扩展。
    index
    如果消息分片,记录当前分片的索引。当前版本 (version=1) 无意义,预留扩展。
    Record 中描述列属性的字段为 "Field",包含如下四个属性:
    name:列名。
    dataTypeNumber:是 binlog 中记录的数据类型。取值详见 MySQL
    isKey:是否主键。
    originalType:DDL 中定义的类型。

    数据库字段映射关系

    如下为数据库(如 MySQL)字段类型和 Avro 协议中定义的数据类型之间的映射关系。
    MySQL 类型
    对应 Avro 中的类型
    MYSQL_TYPE_NULL
    EmptyObject
    MYSQL_TYPE_INT8
    Integer
    MYSQL_TYPE_INT16
    Integer
    MYSQL_TYPE_INT24
    Integer
    MYSQL_TYPE_INT32
    Integer
    MYSQL_TYPE_INT64
    Integer
    MYSQL_TYPE_BIT
    Integer
    MYSQL_TYPE_YEAR
    DateTime
    MYSQL_TYPE_FLOAT
    Float
    MYSQL_TYPE_DOUBLE
    Float
    MYSQL_TYPE_VARCHAR
    Character
    MYSQL_TYPE_STRING
    Character,如果原类型为 binary,则对应 BinaryObject
    MYSQL_TYPE_VAR_STRING
    Character,如果原类型为 varbinary,则对应 BinaryObject
    MYSQL_TYPE_TIMESTAMP
    Timestamp
    MYSQL_TYPE_DATE
    DateTime
    MYSQL_TYPE_TIME
    DateTime
    MYSQL_TYPE_DATETIME
    DateTime
    MYSQL_TYPE_TIMESTAMP_NEW
    Timestamp
    MYSQL_TYPE_DATE_NEW
    DateTime
    MYSQL_TYPE_TIME_NEW
    DateTime
    MYSQL_TYPE_DATETIME_NEW
    DateTime
    MYSQL_TYPE_ENUM
    TextObject
    MYSQL_TYPE_SET
    TextObject
    MYSQL_TYPE_DECIMAL
    Decimal
    MYSQL_TYPE_DECIMAL_NEW
    Decimal
    MYSQL_TYPE_JSON
    TextObject
    MYSQL_TYPE_BLOB
    BinaryObject
    MYSQL_TYPE_TINY_BLOB
    BinaryObject
    MYSQL_TYPE_MEDIUM_BLOB
    BinaryObject
    MYSQL_TYPE_LONG_BLOB
    BinaryObject
    MYSQL_TYPE_GEOMETRY
    BinaryObject
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持