tencent cloud

文档反馈

数据消费操作指导

最后更新时间:2023-08-25 15:36:55

    操作场景

    数据同步到 Kafka 后,您可以通过0.11版本及以上的 Kafka 客户端 进行消费订阅数据,本文为您提供了 Java、Go、Python 语言的客户端消费 Demo 示例,方便您快速测试消费数据的流程,了解数据格式解析的方法。

    注意事项

    Demo 并不包含消费数据的用法演示,仅对数据做了打印处理,您需要在此基础上自行编写数据处理逻辑,您也可以使用其他语言的 Kafka 客户端消费并解析数据。
    目标 Ckafka 中消息大小设置的上限需要大于源库表中单行数据的最大值,以便数据可以正常同步到目标端。
    在同步指定库/表对象(非源实例全部),并且采用 Kafka 单分区的场景中,DTS 解析增量数据后,仅将同步对象的数据写入 Kafka Topic 中,其他非同步对象的数据会转成空事务写入 Kafka Topic,所以在消费数据时会出现空事务。空事务的 Begin/Commit 消息中保留了事务的 GTID 信息,可以保证 GTID 的连续性和完整性。同时,在 MySQL/TDSQL-C MySQL 的消费 Demo 中,多个空事务也做了压缩处理以减少消息数量。
    为了保证数据可重入,DTS 同步到 Kafka 链路引入 Checkpoint 机制。消息写入 Kafka Topic 时,一般每10秒会插入一个 Checkpoint,用来标识数据同步的位点,在任务中断后再重启识别断点位置,实现断点续传。另外,消费端遇到 Checkpoint 消息会做一次 Kafka 消费位点提交,以便及时更新消费位点。
    数据格式选择 JSON 时,如果您使用过或者熟悉开源订阅工具 Canal,可以选择将这里消费出来的 JSON 格式数据转换成 Canal 工具兼容的数据格式,再进行后续处理,我们的 Demo 中已经提供了相关支持,在启动 Demo 的参数中添加参数 trans2canal 即可实现。目前该功能仅限 Java 语言支持。

    消费 Demo 下载

    在配置同步任务中,您可以选择不同的数据格式,Avro 和 JSON。Avro 采用二进制格式,消费效率更高,JSON 采用轻量级的文本格式,更加简单易用。选择的数据格式不同,参考的 Demo 示例也不同。
    如下提供的 Demo 示例,均已包含对应的 Avro/JSON 协议文件,无需另外下载。
    Demo 中的逻辑讲解及关键参数说明,请参考 Demo 说明
    Demo 语言
    Avro 格式
    (MySQL/MariaDB/Percona/TDSQL-C MySQL/TDSQL MySQL)
    JSON 格式(MySQL/MariaDB/Percona/TDSQL-C MySQL//TDSQL MySQL)
    Go
    地址
    地址
    Java
    地址
    地址
    Python
    地址
    地址

    Java Demo 使用说明

    编译环境:Maven 或者 Gradle 包管理工具,JDK8。用户可自行选择打包工具,如下以 Maven 为例进行介绍。 操作步骤:
    1. 下载 Java Demo,然后解压该文件。
    2. 进入解压后的目录,为方便使用,目录下分别放置了 Maven 模型文件、pom.xml 文件,用户根据需要选用。 使用 Maven 进行打包:mvn clean package。
    3. 运行 Demo。 使用 Maven 打包后,进入目标文件夹 target,运行如下代码:java -jar consumerDemo-avro-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --trans2sql
    broker 为 CKafka 的访问地址,topic 为同步任务中设置的 topic 名称,如果是多 topic 需要分别消费。这两个可通过数据同步 > 操作 > 查看获取。
    group为消费组名称,用户可提前在 Ckafka 中创建消费组,也可以在此处自定义输入。
    trans2sql 表示是否转换为 SQL 语句,java 代码中,携带该参数表示转换为 SQL 语句,不携带则不转换。
    trans2canal 表示是否转换为 Canal 格式打印出来,携带该参数表示转换为 Canal 格式,不携带则不转换。
    说明
    携带 trans2sql 时,将使用 javax.xml.bind.DatatypeConverter.printHexBinary() 将 byte 值转成16进制,请使用 JDK1.8 版本及以上避免不兼容。如果不需要转 SQL,可以注释此处代码。
    4. 观察消费情况。
    
    

    Golang Demo 使用说明

    编译环境:Golang 1.12 及以上版本,配置好 Go Module 环境。 操作步骤:
    1. 下载 Golang Demo,然后解压该文件。
    2. 进入解压后的目录,运行 go build -o subscribe ./main/main.go,生成可执行文件 subscribe。
    3. 运行 ./subscribe --brokers=xxx --topic=xxx --group=xxx --trans2sql=true
    broker 为 CKafka 的访问地址,topic 为同步任务中设置的 topic 名称,如果是多 topic 需要分别消费。这两个可通过 数据同步>操作>查看 获取。
    group为消费组名称,用户可提前在 Ckafka 中创建消费组,也可以在此处自定义输入。
    trans2sql 表示是否转换为 SQL 语句。
    4. 观察消费情况。
    
    

    Python3 Demo 使用说明

    编译运行环境:安装 Python3,pip3(用于依赖包安装)。 使用 pip3 安装依赖包:
    pip install flag
    pip install kafka-python
    pip install avro
    操作步骤:
    1. 下载 Python3 Demo ,然后解压该文件。
    2. 运行 python main.py --brokers=xxx --topic=xxx --group=xxx --trans2sql=1
    broker 为 CKafka 的访问地址,topic 为同步任务中设置的 topic 名称,如果是多 topic 需要分别消费。这两个可通过数据同步 > 操作 > 查看获取。
    group为消费组名称,用户可提前在 Ckafka 中创建消费组,也可以在此处自定义输入。
    trans2sql 表示是否转换为 SQL 语句。
    3. 观察消费情况。
    
    
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持