Integrating Spark Streaming with Ckafka

Last updated: 2019-07-26 17:46:22

PDF

Tencent Cloud Elastic MapReduce (EMR) allows you to realize the following streaming applications with CKafka:

  • Log information stream processing
  • User behavior record stream processing
  • Alarm information collection and processing
  • Messaging

1. Prerequisites

  • This task is required to access to CKafka, so you need to create a CKafka instance first. For more information, see CKafka.
  • Confirm that you have activated Tencent Cloud and created an EMR cluster. When creating the EMR cluster, select the Spark component on the software configuration page.

2. Using Kafka Toolkit in EMR Cluster

First, you need to check the private IP and port number of CKafka. Log in to the CKafka Console, select the CKafka instance you want to use, and view its private IP as $kafkaIP in the basic information section, and the port number is generally defaulted to 9092. Create a topic named spark_streaming_test on the topic management page.

Log in to any node (preferably a master one) in the EMR cluster. For more information about how to log in to EMR, see Logging in to a Linux Instance. Here, you can use WebShell. Click Login button on the right of the desired CVM instance to enter the login page. The default username is root, and the password is the one you set when creating the EMR cluster. Once the correct credentials have been validated, you can access to the command-line interface.

Run the following command on the EMR command-line interface to switch to the Hadoop user and go to the directory /usr/local/service/spark:

[root@172 ~]# su hadoop
[root@172 root]$ cd / usr/local/service/spark

Download the installation package Kafka's official website. We recommend Kafka Client as it is most compatible with Tencent Cloud CKafka. Then, Decompress the package and move the extracted folder to the /opt directory:

[hadoop@172 data]$ tar -xzvf kafka_2.10-0.10.2.0.tgz
[hadoop@172 data]$ mv kafka_2.10-0.10.2.0 /opt/

Once the package is decompressed, you can use Kafka. Run the telnet command to see whether the EMR cluster is connected to the CKafka instance:

[hadoop@172 kafka_2.10-0.10.2.0]$ telnet $kafkaIP 9092
Trying $kafkaIP...
Connected to $kafkaIP.

$kafkaIP is the private IP address of the CKafka instance you created.
The following example describes how to test the Kafka toolkit. Log in to the EMR cluster using two WebShell terminals, switch to the Hadoop user, and go to the Kafka installation path:

[root@172 ~]# su hadoop
[hadoop@172 root]$ cd /opt/kafka_2.10-0.10.2.0/

Connect to CKafka on the first terminal and send the following message:

[hadoop@172 kafka_2.10-0.10.2.0]$ bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 
--topic spark_streaming_test
hello world
this is a message

Connect to CKafka on the other terminal. Now, as a consumer, you are able to access or consume records from a Kafka cluster:

[hadoop@172 kafka_2.10-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server 
$kafkaIP:9092 --from-beginning --new-consumer --topic spark_streaming_test
hello world
this is a message

3. Connecting Spark Streaming to CKafka

On the consumer side, Spark Streaming is used to continuously pull data from CKafka for word frequency counting, i.e. performing the WordCount task on the streaming data. On the producer side, a program is used to constantly generate data which is continuously delivered to CKafka.
Download and install Maven first and then configure its environment variables. If you are using IDE, please configure Maven-related items in your IDE.

Creating a Spark Streaming Consumer Project

Enter the directory for your Maven project, such as D://mavenWorkplace, using the following commands:

mvn   archetype:generate   -DgroupId=$yourgroupID   -DartifactId=$yourartifactID 
-DarchetypeArtifactId=maven-archetype-quickstart

Here, $yourgroupID is your package name, $yourartifactID is your project name, and maven-archetype-quickstart indicates to create a Maven Java project. Some files need to be downloaded during the process, so please keep the Internet connected.
After successfully creating the project, you will see a folder named $yourartifactID in the D://mavenWorkplace directory. The files included in the folder have the following structure:

simple
   ---pom.xml    Core configuration, under the project root directory
   ---src
     ---main      
       ---java      Java source code directory
         ---resources   Java configuration file directory
    ---test
      ---java      Test source code directory
      ---resources   Test configuration directory

Among the files above, pay extra attention to the pom.xml file and the Java folder under the main directory. The pom.xml file is primarily used to create dependencies and package configurations; the Java folder is used to store your source codes.

First, add the Maven dependencies to the pom.xml file:

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>
</dependencies>

Then, add the packaging and compiling plugins to the pom.xml file:

<build>
<plugins>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
      <source>1.8</source>
      <target>1.8</target>
      <encoding>utf-8</encoding>
    </configuration>
  </plugin>
  <plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
      <descriptorRefs>
      <descriptorRef>jar-with-dependencies</descriptorRef>
      </descriptorRefs>
    </configuration>
    <executions>
      <execution>
        <id>make-assembly</id>
        <phase>package</phase>
        <goals>
          <goal>single</goal>
        </goals>
      </execution>
    </executions>
  </plugin>
</plugins>
</build>

Note: Replace $yourgroupID and $yourartifactID with your real information.

Then, add the sample code by creating a Java Class named KafkaTest.java in the main>Java folder and adding the following code to it:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * Created by tencent on 2018/7/3.
 */
public class KafkaTest {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "$kafkaIP:9092";
        String topics = "spark_streaming_test1";  // Subscribed topics; multiple topics should be separated by ','
        int durationSeconds = 60;  // Interval
        SparkConf conf = new SparkConf().setAppName("spark streaming word count");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(durationSeconds));
        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        // Kafka-related parameter
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // Create a connection
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
        );
        // wordcount logic
        JavaPairDStream<String, Integer> counts = lines
                .flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator())
                .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
                .reduceByKey((x, y) -> x + y);
        // Save the result
        counts.dstream().saveAsTextFiles("$hdfsPath","result");
//
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
}

Pay attention to the following settings in the code:

  • The brokers variable should be set to the private IP of the CKafka instance found in step 2;
  • The topics variable should be set to the name of the topic you created, e.g., spark_streaming_test1 here;
  • durationSeconds is the interval for the program to consume the data in CKafka, e.g., 60 seconds here;
  • $hdfsPath is the path in HDFS to which the result will be output.

Use the local command prompt to enter the project directory and run the following command to compile and package the project:

mvn package

"Build success" indicates that package is successfully created. You can see the generated .jar package in the target folder under the project directory.
Upload the package file to the EMR cluster using the scp or sftp tool. Be sure to include the dependencies in the .jar package to be uploaded:

scp $localfile root@public IP address:$remotefolder

Here, $localfile is the path and the name of your local file; root is the CVM instance username. You can look up the public IP address in the node information in the EMR or CVM Console. $remotefolder is the path where you want to store the file in the CVM instance. After the upload is completed, you can check whether the file is in the corresponding folder using EMR command lines.

Creating a Spark Streaming Producer Project

Enter the directory for your Maven project, such as D://mavenWorkplace, using the following commands:

mvn archetype:generate -DgroupId=$yourgroupID -DartifactId=$yourartifactID 
-DarchetypeArtifactId=maven-archetype-quickstart

First, add the Maven dependencies to the pom.xml file:

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.10.1.0</version>
        </dependency>
    </dependencies>

Then, add the packaging and compiling plugins to the pom.xml file:

<build>
<plugins>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
      <source>1.8</source>
      <target>1.8</target>
      <encoding>utf-8</encoding>
    </configuration>
  </plugin>
  <plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
      <descriptorRefs>
      <descriptorRef>jar-with-dependencies</descriptorRef>
      </descriptorRefs>
    </configuration>
    <executions>
      <execution>
        <id>make-assembly</id>
        <phase>package</phase>
        <goals>
          <goal>single</goal>
        </goals>
      </execution>
    </executions>
  </plugin>
</plugins>
</build>

Note: Replace $yourgroupID and $yourartifactID with your real information.

Then, add the sample code by creating a Java Class named SendData.java in the main>Java folder and adding the following code to it:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;


/**
 * Created by tencent on 2018/7/4.
 */
public class SendData {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "$kafkaIP:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // The producer sends a message
        String topic = "spark_streaming_test1";
        org.apache.kafka.clients.producer.Producer<String, String> procuder = new KafkaProducer<String,String>(props);
        while(true){
            int num = (int)((Math.random())*10);
            for (int i = 0; i <= 10; i++) {
                int tmp = (num+i)%10;
                String value = "value_" + tmp;
                ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);
                procuder.send(msg);
            }

            try {Thread.sleep(1000*10);}
            catch (InterruptedException e) {}
        }
    }
}

Replace $kafkaIP with the private IP address of your CKafka instance.

This program sends 10 messages from value_0 to value_9 to CKafka every 10 seconds, starting at a random order. For more information about the parameters in the program, see the consumer program.
Use the local command prompt to enter the project directory and run the following command to compile and package the project:

mvn package

"Build success" indicates that package is successfully created. You can see the generated .jar package in the target folder under the project directory.
Upload the package file to the EMR cluster using the scp or sftp tool. Be sure to include the dependencies in the .jar package to be uploaded:

scp $localfile root@public IP address:$remotefolder

Using the Program to Consume CKafka Data

Use two interfaces to log in to the WebShell of the EMR cluster.
In the first interface, log in to a master node of the EMR cluster and switch to the Hadoop user, as shown in section 2. Run the following command to run the demo:

[hadoop@172 ~]$ bin/spark-submit --class KafkaTest --master yarn-cluster $consumerpackage 

The parameters are as follows:

  • --class indicates the entry class to be executed, e.g., KafkaTest in this example
  • --master is the master URL of the cluster
  • $consumerpackage is the package name of the packaged consumer program

After the program is started, it will run continuously in the Yarn cluster. Run the following command to view the status of the program running:

[hadoop@172 ~]$ yarn application –list

In the second interface, log in to the WebShell of EMR and run the producer program, so that Spark Streaming can retrieve the data for consumption.

[hadoop@172 spark]$ bin/spark-submit --class SendData $producerpackage

Here, $producerpackage is the package name of the packaged producer program. The result of the wordcount task will be output to the specified HDFS folder in a while. You can view in HDFS the result of Spark Streaming's consumption of the CKafka data:

[hadoop@172 root]$ hdfs dfs -ls /user
Found 9 items
drwxr-xr-x - hadoop supergroup  0 2018-07-03 16:37 /user/hadoop
drwxr-xr-x - hadoop supergroup  0 2018-06-19 10:10 /user/hive
-rw-r--r-- 3 hadoop supergroup 0 2018-06-29 10:19 /user/pythontest.txt
drwxr-xr-x - hadoop supergroup 0 2018-07-05 20:25 /user/sparkstreamingtest-1530793500000.result

[hadoop@172 root]$ hdfs dfs -cat /user/sparkstreamingtest-1530793500000.result/*
(value_6,16)
(value_7,22)
(value_8,18)
(value_0,18)
(value_9,17)
(value_1,18)
(value_2,17)
(value_3,17)
(value_4,16)
(value_5,17)

Finally, exit the KafkaTest program in the Yarn cluster:

[hadoop@172 ~]$ yarn application –kill $Application-Id

Here, $Application-Id is the ID found by running the yarn application –list command.
For more information about Kafka, see the official documentation.