Database/Table Data Subscription to Kafka

Last updated: 2020-04-27 17:04:09

    This document provides a simple example that walks you through how to pull a table from data subscription to Kafka as well as a simple Kafka Demo.

    Configuring Environment

    Installing Kafka

    1. Please install Kafka as instructed in Kafka Quick Start.
    2. After Kafka is launched, create a testtop topic.
      [root@VM_71_10_centos kafka_2.11-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtop
      Created topic "testtop".

    Getting Key

    Log in to the CAM Console to get a key.

    Selecting Data Subscription

    1. Log in to the DTS Console and select Data Subscription on the left sidebar to enter the data subscription page.
    2. In the subscription list, click a subscription name to enter the subscription details page and view the corresponding channel ID, service IP, and service port.
    3. Enter them together with the obtained key into the corresponding KafkaDemo.java.
      // Enter the key obtained from the CAM Console here      
      final String TOPIC = "testtop"; Subscribed topic
           Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); Enter the `ip:port` corresponding to Kafka
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           final Producer<String, String> producer = new KafkaProducer<String, String>(props);
                  context.setSecretId("AKIDfdsfdsfsdt1331431sdfds"); Enter the obtained `secretID`
           context.setSecretKey("test111usdfsdfsddsfRkeT"); Enter the obtained `secretKey`
       // Enter the IP and port obtained from the subscription details page here
           context.setServiceIp("10.66.112.181"); Enter the obtained IP
           context.setServicePort(7507); Enter the obtained port
           final DefaultSubscribeClient client = new DefaultSubscribeClient(context);
       // Enter the names of both the database and table to be synced and modify the name of the file where they will be stored
           final String targetDatabase = "test"; Enter the name of the database to subscribe to
                    client.addClusterListener(listener);
       // Enter the `dts-channel` configuration information obtained from the subscription details page here
       client.askForGUID("dts-channel-e4FQxtYV3It4test"); Enter the obtained channel ID
           client.start();

    Compiling and Testing

    1. Compile the client program KafkaDemo.java.
      javac -classpath binlogsdk-2.6.0-release.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:kafka-clients-1.1.0.jar -encoding UTF-8 KafkaDemo.java 
    2. Launch the program. If no errors are reported, the program works properly.
      java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g  -classpath .:binlogsdk-2.6.0-release.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar  KafkaDemo
    3. Insert a data entry into the alantest table, and you will find that the data has been stored in the testtop subscribed to by Kafka.
    MySQL [test]> insert into alantest values(123456,'alan');
    Query OK, 1 row affected (0.02 sec)
    
    [root@VM_71_10_centos kafka_2.11-1.1.0]#  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtop --from-beginning 
    checkpoint:144251@3@1275254@1153089
    record_id:00000100000000001198410000000000000001
    record_encoding:utf8
    fields_enc:latin1,utf8
    gtid:4f21864b-3bed-11e8-a44c-5cb901896188:5552
    source_category:full_recorded
    source_type:mysql
    table_name:alantest
    record_type:INSERT
    db:test
    timestamp:1524649133
    primary:id
    
    Field name: id
    Field type: 3
    Field length: 6
    Field value: 123456
    Field name: name
    Field type: 253
    Field length: 4
    Field value: alan

    Was this page helpful?

    Was this page helpful?

    • Not at all
    • Not very helpful
    • Somewhat helpful
    • Very helpful
    • Extremely helpful
    Send Feedback
    Help