tencent cloud

文档反馈

Protobuf Demo 说明(Flink)

最后更新时间:2023-08-25 16:09:04

    Demo 关键逻辑讲解

    消息生产逻辑

    下文首先对消息生产逻辑进行简要说明,有助于用户理解消费逻辑。 我们采用 Protobuf 进行序列化,Demo 中均附带有 Protobuf 定义文件。文件中定义了几个关键结构:Envelope 是最终发送的 Kafka 消息结构;Entry 是单个订阅事件结构;Entries 是 Entry 的集合。主要数据结构关系如下所示:
    
    
    
    生产过程如下:
    1. 拉取 Binlog 消息,将每个 Binlog Event 编码为一个 Entry 结构体。
    message Entry { //Entry 是单个订阅事件结构,一个事件相当于 MySQL 的一个 binlog event
    Header header = 1; //事件头
    Event event = 2; //事件体
    }
    
    
    message Header {
    int32 version = 1; //Entry 协议版本
    SourceType sourceType = 2; //源库的类型信息,包括 MySQL,Oracle 等类型
    MessageType messageType = 3; //消息的类型,也就是 Event 的类型,包括 BEGIN、COMMIT、DML 等
    uint32 timestamp = 4; //Event 在原始 binlog 中的时间戳
    int64 serverId = 5; //源的 serverId
    string fileName = 6; //源 binlog 的文件名称
    uint64 position = 7; //事件在源 binlog 文件中的偏移量
    string gtid = 8; //当前事务的 gtid
    string schemaName = 9; //变更影响的 schema
    string tableName = 10; //变更影响的 table
    uint64 seqId = 11; //全局递增序列号
    uint64 eventIndex = 12; //如果大的 event 分片,每个分片从0开始编号,当前版本无意义,留待后续扩展用
    bool isLast = 13; //当前 event 是否 event 分片的最后一块,是则为 true,当前版本无意义,留待后续扩展用
    repeated KVPair properties = 15;
    }
    
    
    message Event {
    BeginEvent beginEvent = 1; //binlog 中的 begin 事件
    DMLEvent dmlEvent = 2; //binlog 中的 dml 事件
    CommitEvent commitEvent = 3; //binlog 中的 commit 事件
    DDLEvent ddlEvent = 4; //binlog 中的 ddl 事件
    RollbackEvent rollbackEvent = 5; //rollback 事件,当前版本无意义
    HeartbeatEvent heartbeatEvent = 6; //源库定时发送的心跳事件
    CheckpointEvent checkpointEvent = 7; //订阅后台添加的 checkpoint 事件,每10秒自动生成一个,用于 Kafka 生产和消费位点管理
    repeated KVPair properties = 15;
    }
    2. 为减少消息量,将多个 Entry 合并,合并后的结构为 Entries,Entries.items 字段即为 Entry 顺序列表。合并的数量以合并后不超过 Kafka 单个消息大小限制为标准。对单个 Event 就已超过大小限制的,则不再合并,Entries 中只有唯一 Entry 。
    message Entries {
    repeated Entry items = 1; //entry list
    }
    3. 对 Entries 进行 Protobuf 编码得到二进制序列。
    4. 将 Entries 的二进制序列放入 Envelope 的 data 字段。当存在单个 Binlog Event 过大时,二进制序列可能超过 Kafka 单个消息大小限制,此时我们会将其分割为多段,每段装入一个 Envelope。
    message Envelope {
    int32 version = 1; //protocol version, 决定了 data 内容如何解码
    uint32 total = 2;
    uint32 index = 3;
    bytes data = 4; //当前 version 为1, 表示 data 中数据为 Entries 被 PB 序列化之后的结果
    repeated KVPair properties = 15;
    }
    5. 对上一步生成的一个或多个 Envelope 依次进行 Protobuf 编码,然后投递到 Kafka 分区。同一个 Entries 分割后的多个 Envelope 顺序投递到同一个分区。

    消息消费逻辑

    下文对消费逻辑进行简要说明。
    1. Flink 消费端需要创建一个 FlinkKafkaConsumer,需要指定消费主题,以及自定义一个基于 Protobuf 协议的消息反序列化器。
    // 创建一个 Flink-kafka 消费者
    FlinkKafkaConsumer<RecordMsgObject> consumer =
    new FlinkKafkaConsumer<>(topic, new DeserializeProtobufToRecordMsgObject(), props);
    2. DeserializeProtobufToRecordMsgObject 将原始消息反序列化为 RecordMsgObject 对象。
    
    // 自定义反序列化器,将原始消息反序列化为 RecordMsgObject 对象
    @Override
    public RecordMsgObject deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    RecordMsgObject obj = new RecordMsgObject();
    obj.topic = record.topic();
    obj.partition = record.partition();
    obj.offset = record.offset();
    obj.partitionSeq = getPartitionSeq(record);
    obj.key = new String(record.key());
    obj.headers = record.headers();
    // 这里收到的就是 Envelope 的二进制的值
    obj.value = record.value();
    return obj;
    }
    3. 将收到的消息按照消息的 partition 分组处理,分组处理逻辑由 SubscribeMsgProcess 实现。
    //将收到的消息按照 parttition 分区处理
    stream.keyBy(RecordMsgObject::getPartition)
    .process(new SubscribeMsgProcess(trans2sql)).setParallelism(1);
    4. SubscribeMsgProcess 中将收到的二进制序列用 Protobuf 解码为 Envelope 。
    // 反序列出 envlope,本 demo 默认一个 envlope 就可以存储完整个 bnlog-event 的数据
    SubscribeProtobufData.Envelope envelope = SubscribeProtobufData.Envelope.parseFrom(record.value);
    if (1 != envelope.getVersion()) {
    throw new IllegalStateException(String.format("unsupported version: %d", envelope.getVersion()));
    }
    注意:
    本 Demo 演示时默认单个 Binlog Event 不会超过 Kafka 单个消息大小限制,如果超过单个消息大小限制,Demo 必须在消费时引入 Flink 的高级特性“状态”去拼接 Envelope 来得到完整消息体,此种场景需要用户根据自己的业务场景自行处理。
    5. 将收到的 Envelope 的 data 字段的二进制序列用 Protobuf 解码为 Entries 。
    // 反序列得到 Entries
    ByteString envelopeData = envelope.getData();
    SubscribeProtobufData.Entries entries;
    if (1 == envelope.getTotal()) {
    entries = SubscribeProtobufData.Entries.parseFrom(envelopeData.toByteArray());
    } else {
    entries = SubscribeProtobufData.Entries.parseFrom(shardMsgMap.get(shardId).toByteArray());
    shardMsgMap.remove(shardId);
    }
    6. 对 Entries.items 依次处理,打印原始 Entry 结构或者转化为 SQL 语句。
    // 遍历每个 Entry,根据 Entry 类型去打印 sql
    for (SubscribeProtobufData.Entry entry : entries.getItemsList()) {
    onEntry(record.partition, record.offset, ps, entry, trans2sql);
    }

    Table API & Flink SQL

    本 Demo 只展示了客户端类型为 DataStream API 的模式,Flink 的客户端模式为 Table API & Flink SQL的场景需要用户自行处理。使用 Table API & Flink SQL 的客户端模式有两种方式:
    1. 使用 DataStream 转化成 Table 的客户端形式,具体可以参考:DataStream API Integration
    2. 基于 Table API & Flink SQL 自定义一个 connector,具体可以参考:User-defined Sources & Sinks

    数据库字段映射和存储

    本节介绍数据库字段类型和序列化协议中定义的数据类型之间的映射关系。 源数据库字段值在 Protobuf 协议中用如下所示的 Data 结构来存储。
    message Data {
    DataType dataType = 1;
    string charset = 2; //DataType_STRING 的编码类型, 值存储在 bv 里面
    string sv = 3; //DataType_INT8/16/32/64/UINT8/16/32/64/Float32/64/DataType_DECIMAL 的字符串值
    bytes bv = 4; //DataType_STRING/DataType_BYTES 的值
    }
    其中 DataType 字段代表存储的字段类型,可取枚举值如下图所示。
    enum DataType {
    NIL = 0; //值为 NULL
    INT8 = 1;
    INT16 = 2;
    INT32 = 3;
    INT64 = 4;
    UINT8 = 5;
    UINT16 = 6;
    UINT32 = 7;
    UINT64 = 8;
    FLOAT32 = 9;
    FLOAT64 = 10;
    BYTES = 11;
    DECIMAL = 12;
    STRING = 13;
    NA = 14; //值不存在 (N/A)
    }
    其中 bv 字段存储 STRING 和 BYTES 类型的二进制表示,sv 字段存储 INT8/16/32/64/UINT8/16/32/64/DECIMAL 类型的字符串表示,charset 字段存储 STRING 的编码类型。
    TDSQL MySQL 原始类型与 DataType 映射关系如下(对 UNSIGNED 修饰的 MYSQL_TYPE_INT8/16/24/32/64 分别映射为 UINT8/16/32/32/64):
    说明:
    DATETIMEDATETIME 类型不支持时区。
    TIMESTAMP 类型支持时区,该类型字段表示:存储时,系统会从当前时区转换为 UTC(Universal Time Coordinated)进行存储;查询时,系统会从 UTC 转换为当前时区进行查询。
    综上,如下表中 "MYSQL_TYPE_TIMESTAMP" 和 "MYSQL_TYPE_TIMESTAMP_NEW" 字段会携带时区信息,用户在消费数据时可自行转换。(例如,DTS 输出的时间格式是带时区的字符串"2021-05-17 07:22:42 +00:00",其中,"+00:00"表示 UTC 时间,用户在解析和转换的时候需要考虑时区信息。)
    TDSQL MySQL 字段类型
    对应的 Protobuf DataType 枚举值
    MYSQL_TYPE_NULL
    NIL
    MYSQL_TYPE_INT8
    INT8
    MYSQL_TYPE_INT16
    INT16
    MYSQL_TYPE_INT24
    INT32
    MYSQL_TYPE_INT32
    INT32
    MYSQL_TYPE_INT64
    INT64
    MYSQL_TYPE_BIT
    INT64
    MYSQL_TYPE_YEAR
    INT64
    MYSQL_TYPE_FLOAT
    FLOAT32
    MYSQL_TYPE_DOUBLE
    FLOAT64
    MYSQL_TYPE_VARCHAR
    STRING
    MYSQL_TYPE_STRING
    STRING
    MYSQL_TYPE_VAR_STRING
    STRING
    MYSQL_TYPE_TIMESTAMP
    STRING
    MYSQL_TYPE_DATE
    STRING
    MYSQL_TYPE_TIME
    STRING
    MYSQL_TYPE_DATETIME
    STRING
    MYSQL_TYPE_TIMESTAMP_NEW
    STRING
    MYSQL_TYPE_DATE_NEW
    STRING
    MYSQL_TYPE_TIME_NEW
    STRNG
    MYSQL_TYPE_DATETIME_NEW
    STRING
    MYSQL_TYPE_ENUM
    STRING
    MYSQL_TYPE_SET
    STRING
    MYSQL_TYPE_DECIMAL
    DECIMAL
    MYSQL_TYPE_DECIMAL_NEW
    DECIMAL
    MYSQL_TYPE_JSON
    BYTES
    MYSQL_TYPE_BLOB
    BYTES
    MYSQL_TYPE_TINY_BLOB
    BYTES
    MYSQL_TYPE_MEDIUM_BLOB
    BYTES
    MYSQL_TYPE_LONG_BLOB
    BYTES
    MYSQL_TYPE_GEOMETRY
    BYTES
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持