Database/Table Data Subscription to Kafka

Last updated: 2020-04-27 17:04:09

    This document provides a simple example that walks you through how to pull a table from data subscription to Kafka as well as a simple Kafka Demo.

    Configuring Environment

    Installing Kafka

    1. Please install Kafka as instructed in Kafka Quick Start.
    2. After Kafka is launched, create a testtop topic.
      [root@VM_71_10_centos kafka_2.11-1.1.0]# bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtop
      Created topic "testtop".

    Getting Key

    Log in to the CAM Console to get a key.

    Selecting Data Subscription

    1. Log in to the DTS Console and select Data Subscription on the left sidebar to enter the data subscription page.
    2. In the subscription list, click a subscription name to enter the subscription details page and view the corresponding channel ID, service IP, and service port.
    3. Enter them together with the obtained key into the corresponding
      // Enter the key obtained from the CAM Console here      
      final String TOPIC = "testtop"; Subscribed topic
           Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); Enter the `ip:port` corresponding to Kafka
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           final Producer<String, String> producer = new KafkaProducer<String, String>(props);
                  context.setSecretId("AKIDfdsfdsfsdt1331431sdfds"); Enter the obtained `secretID`
           context.setSecretKey("test111usdfsdfsddsfRkeT"); Enter the obtained `secretKey`
       // Enter the IP and port obtained from the subscription details page here
           context.setServiceIp(""); Enter the obtained IP
           context.setServicePort(7507); Enter the obtained port
           final DefaultSubscribeClient client = new DefaultSubscribeClient(context);
       // Enter the names of both the database and table to be synced and modify the name of the file where they will be stored
           final String targetDatabase = "test"; Enter the name of the database to subscribe to
       // Enter the `dts-channel` configuration information obtained from the subscription details page here
       client.askForGUID("dts-channel-e4FQxtYV3It4test"); Enter the obtained channel ID

    Compiling and Testing

    1. Compile the client program
      javac -classpath binlogsdk-2.6.0-release.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:kafka-clients-1.1.0.jar -encoding UTF-8 
    2. Launch the program. If no errors are reported, the program works properly.
      java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g  -classpath .:binlogsdk-2.6.0-release.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar  KafkaDemo
    3. Insert a data entry into the alantest table, and you will find that the data has been stored in the testtop subscribed to by Kafka.
    MySQL [test]> insert into alantest values(123456,'alan');
    Query OK, 1 row affected (0.02 sec)
    [root@VM_71_10_centos kafka_2.11-1.1.0]#  bin/ --bootstrap-server localhost:9092 --topic testtop --from-beginning 
    Field name: id
    Field type: 3
    Field length: 6
    Field value: 123456
    Field name: name
    Field type: 253
    Field length: 4
    Field value: alan

    Was this page helpful?

    Was this page helpful?

    • Not at all
    • Not very helpful
    • Somewhat helpful
    • Very helpful
    • Extremely helpful
    Send Feedback