Data subscription Database table to Kafka

Last updated: 2020-04-01 16:59:12


This document provides a simple example that walks you through how to pull a table from data subscription to Kafka as well as a simple Kafka Demo.

Configuring Environment

Installing Kafka

  1. Please install Kafka as instructed in Kafka Quick Start.
  2. After Kafka is launched, create a testtop topic.
    [root@VM_71_10_centos kafka_2.11-1.1.0]# bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtop
    Created topic "testtop".

Getting Key

Log in to the CAM Console to get a key.

Selecting Data Subscription

  1. Log in to the DTS Console and select Data Subscription on the left sidebar to enter the data subscription page.
  2. In the subscription list, click a subscription name to enter the subscription details page and view the corresponding channel ID, service IP, and service port.
  3. Enter them together with the obtained key into the corresponding
    // Enter the key obtained from the CAM Console here      
    final String TOPIC = "testtop"; Subscribed topic
         Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); Enter the `ip:port` corresponding to Kafka
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         final Producer<String, String> producer = new KafkaProducer<String, String>(props);
                context.setSecretId("AKIDfdsfdsfsdt1331431sdfds"); Enter the obtained `secretID`
         context.setSecretKey("test111usdfsdfsddsfRkeT"); Enter the obtained `secretKey`
     // Enter the IP and port obtained from the subscription details page here
         context.setServiceIp(""); Enter the obtained IP
         context.setServicePort(7507); Enter the obtained port
         final DefaultSubscribeClient client = new DefaultSubscribeClient(context);
     // Enter the names of both the database and table to be synced and modify the name of the file where they will be stored
         final String targetDatabase = "test"; Enter the name of the database to subscribe to
     // Enter the `dts-channel` configuration information obtained from the subscription details page here
     client.askForGUID("dts-channel-e4FQxtYV3It4test"); Enter the obtained channel ID

Compiling and Testing

  1. Compile the client program
    javac -classpath binlogsdk-2.6.0-release.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:kafka-clients-1.1.0.jar -encoding UTF-8 
  2. Launch the program. If no errors are reported, the program works properly.
    java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g  -classpath .:binlogsdk-2.6.0-release.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar  KafkaDemo
  3. Insert a data entry into the alantest table, and you will find that the data has been stored in the testtop subscribed to by Kafka.
MySQL [test]> insert into alantest values(123456,'alan');
Query OK, 1 row affected (0.02 sec)

[root@VM_71_10_centos kafka_2.11-1.1.0]#  bin/ --bootstrap-server localhost:9092 --topic testtop --from-beginning 

Field name: id
Field type: 3
Field length: 6
Field value: 123456
Field name: name
Field type: 253
Field length: 4
Field value: alan