Spark Streaming Connection to CKafka

Last updated: 2020-10-27 16:57:37

    Spark Streaming Overview

    As an extension of Spark Core, Spark Streaming is used for high-throughput and fault-tolerant processing of continuous data. Currently supported external input sources include Kafka, Flume, HDFS/S3, Kinesis, Twitter, and TCP socket.
    Alt text

    Spark Streaming abstracts continuous data into a Discretized Stream (DStream), which consists of a series of continuous resilient distributed datasets (RDDs). Each RDD contains data generated at a certain time interval. Processing DStream with functions is actually processing these RDDs.
    Alt text

    When Spark Streaming is used as data input for Kafka, the following stable and experimental Kafka versions are supported:

    Kafka Version spark-streaming-kafka-0.8 spark-streaming-kafka-0.10
    Broker version 0.8.2.1 or higher 0.10.0 or higher
    API stability Stable Experimental
    Language support Scala, Java, Python Scala, Java
    Receiver DStream Yes No
    Direct DStream Yes Yes
    SSL / TLS support No Yes
    Offset commit API No Yes
    Dynamic topic subscription No Yes

    Currently, the following versions of CKafka are supported: 0.9.0.x, 0.10.0.x, 0.10.1.x, and 0.10.2.x. The Kafka dependency of version 0.10.2.1 is used in this practice scenario.

    In addition, Spark Streaming in EMR also supports direct connection to CKafka. For more information, please see Spark Streaming Connection to Ckafka.

    Spark Streaming Connection to CKafka

    Applying for a Ckafka instance

    Log in to the CKafka Console and create a CKafka instance as instructed in Creating Instances.

    Confirm whether the selected network type matches the currently used network.

    Creating a topic

    Create a topic under the instance as instructed in Creating Topics.
    Private IP and port: the bootstrap-server to be used for production and consumption.
    A topic named spark_test is created here and used as an example to describe how to produce and consume messages.

    CVM environment

    Centos 6.8 OS

    Package Version
    sbt 0.13.16
    Hadoop 2.7.3
    Spark 2.1.0
    Protobuf 2.5.0
    SSH Installed on CentOS by default
    Java 1.8

    Production to CKafka

    Currently, the following versions of CKafka are supported: 0.9.0.x, 0.10.0.x, 0.10.1.x, and 0.10.2.x. The Kafka dependency of version 0.10.2.1 is used here.

    1. Add dependencies to build.sbt:

      name := "Producer Example"
      version := "1.0"
      scalaVersion := "2.11.8"
      libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.1"
    2. Configure producer_example.scala:

      import java.util.Properties
      import org.apache.kafka.clients.producer._
      object ProducerExample extends App {
       val  props = new Properties()
       props.put("bootstrap.servers", "172.16.16.12:9092") // Private IP and port in the instance information
      
       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      
       val producer = new KafkaProducer[String, String](props)
       val TOPIC="test"  // Specify the topic to produce to
       for(i<- 1 to 50){
               val record = new ProducerRecord(TOPIC, "key", s"hello $i") // Produce a message whose `key` is "key" and `value` is "hello i"
               producer.send(record)
       }
       val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
       producer.send(record)
       producer.close() // Disconnect at the end
      }

    For more information on how to use ProducerRecord, please see ProducerRecord.

    Consumption from CKafka

    DirectStream

    1. Add dependencies to build.sbt:

      name := "Consumer Example"
      version := "1.0"
      scalaVersion := "2.11.8"
      libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
      libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
      libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"
    2. Configure DirectStream_example.scala:

      import org.apache.kafka.clients.consumer.ConsumerRecord
      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.kafka.common.TopicPartition
      import org.apache.spark.streaming.kafka010._
      import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
      import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      import org.apache.spark.streaming.kafka010.KafkaUtils
      import org.apache.spark.streaming.kafka010.OffsetRange
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      import collection.JavaConversions._
      import Array._
      object Kafka {
       def main(args: Array[String]) {
           val kafkaParams = Map[String, Object](
               "bootstrap.servers" -> "172.16.16.12:9092",
               "key.deserializer" -> classOf[StringDeserializer],
               "value.deserializer" -> classOf[StringDeserializer],
               "group.id" -> "spark_stream_test1",
               "auto.offset.reset" -> "earliest",
               "enable.auto.commit" -> "false"
           )
      
           val sparkConf = new SparkConf()
           sparkConf.setMaster("local")
           sparkConf.setAppName("Kafka")
           val ssc = new StreamingContext(sparkConf, Seconds(5))
           val topics = Array("spark_test")
      
           val offsets : Map[TopicPartition, Long] = Map()
      
           for (i <- 0 until 3){
               val tp = new TopicPartition("spark_test", i)
               offsets.updated(tp , 0L)
           }
           val stream = KafkaUtils.createDirectStream[String, String](
               ssc,
               PreferConsistent,
               Subscribe[String, String](topics, kafkaParams)
           )
           println("directStream")
           stream.foreachRDD{ rdd=>
               // Output the obtained message
               rdd.foreach{iter =>
                   val i = iter.value
                   println(s"${i}")
               }
               // Get the offset
               val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
               rdd.foreachPartition { iter =>
                   val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
                   println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
               }
           }
      
           // Start the computation
           ssc.start()
           ssc.awaitTermination()
       }
      }

    RDD

    1. Configure build.sbt in the way as detailed here.
    2. Configure RDD_example:
      import org.apache.kafka.clients.consumer.ConsumerRecord
      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.spark.streaming.kafka010._
      import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
      import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      import org.apache.spark.streaming.kafka010.KafkaUtils
      import org.apache.spark.streaming.kafka010.OffsetRange
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      import collection.JavaConversions._
      import Array._
      object Kafka {
       def main(args: Array[String]) {
           val kafkaParams = Map[String, Object](
               "bootstrap.servers" -> "172.16.16.12:9092",
               "key.deserializer" -> classOf[StringDeserializer],
               "value.deserializer" -> classOf[StringDeserializer],
               "group.id" -> "spark_stream",
               "auto.offset.reset" -> "earliest",
               "enable.auto.commit" -> (false: java.lang.Boolean)
           )
           val sc = new SparkContext("local", "Kafka", new SparkConf())
           val java_kafkaParams : java.util.Map[String, Object] = kafkaParams
           // Pull messages in the corresponding offset range from the partition in order. The request will be blocked if no messages can be pulled, until the specified waiting time elapses or the number of produced new messages reaches the number for messages to be pulled
           val offsetRanges = Array[OffsetRange](
               OffsetRange("spark_test", 0, 0, 5),
               OffsetRange("spark_test", 1, 0, 5),
               OffsetRange("spark_test", 2, 0, 5)
           )
           val range = KafkaUtils.createRDD[String, String](
               sc,
               java_kafkaParams,
               offsetRanges,
               PreferConsistent
           )
           range.foreach(rdd=>println(rdd.value))
           sc.stop()
       }
      }
      For more information on how to use kafkaParams, please see kafkaParams.

    Configuring the environment

    Installing sbt

    1. Download the sbt package from sbt's official website.

    2. After decompression, create an sbt_run.sh script with the following content in the sbt directory and add executable permissions:

      #!/bin/bash
      SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
      java $SBT_OPTS -jar `dirname $0`/bin/sbt-launch.jar "$@"
      chmod u+x ./sbt_run.sh
    3. Run the following command:

      ./sbt-run.sh sbt-version

      The display of sbt version indicates a successful installation.

    Installing Protobuf

    1. Download an appropriate version of Protobuf.
    2. Decompress and enter the directory.
      ./configure
      make && make install
      You should install gcc-g++ in advance, and the root permission may be required during installation.
    3. Log in again and enter the following on the command line:
      protoc --version
    4. The display of Protobuf version indicates a successful installation.

    Installing Hadoop

    1. Download the required version at Hadoop's official website.
    2. Add a Hadoop user.
      useradd -m hadoop -s /bin/bash
    3. Grant admin permissions.
      visudo
    4. Add the following in a new line under root ALL=(ALL) ALL:
      hadoop ALL=(ALL) ALL
      Save and exit.
    5. Use Hadoop for operations.
      su hadoop
    6. Configure SSH password-free login.
      cd ~/.ssh/                     # If there is no such directory, please run `ssh localhost` first
      ssh-keygen -t rsa              # There will be prompts. Simply press Enter
      cat id_rsa.pub >> authorized_keys  # Add authorization
      chmod 600 ./authorized_keys    # Modify file permission
    7. Install Java.
      sudo yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel
    8. Configure ${JAVA_HOME}.
      vim /etc/profile
      Add the following at the end:
      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64/jre
      export PATH=$PATH:$JAVA_HOME
      Modify the corresponding path based on the installation information.
    9. Decompress Hadoop and enter the directory.
      ./bin/hadoop version
      The display of version information indicates a successful installation.
    10. Configure the pseudo-distributed mode (so that you can build different forms of clusters as needed).
      vim /etc/profile
      Add the following at the end:
      export HADOOP_HOME=/usr/local/hadoop
      export PATH=$HADOOP_HOME/bin:$PATH
      Modify the corresponding path based on the installation information.
    11. Modify /etc/hadoop/core-site.xml.
      <configuration>
      <property>
          <name>hadoop.tmp.dir</name>
          <value>file:/usr/local/hadoop/tmp</value>
          <description>Abase for other temporary directories.</description>
      </property>
      <property>
          <name>fs.defaultFS</name>
          <value>hdfs://localhost:9000</value>
      </property>
      </configuration>
    12. Modify /etc/hadoop/hdfs-site.xml.
      <configuration>
      <property>
          <name>dfs.replication</name>
          <value>1</value>
      </property>
      <property>
          <name>dfs.namenode.name.dir</name>
          <value>file:/usr/local/hadoop/tmp/dfs/name</value>
      </property>
      <property>
          <name>dfs.datanode.data.dir</name>
          <value>file:/usr/local/hadoop/tmp/dfs/data</value>
      </property>
      </configuration>
    13. Change JAVA_HOME in /etc/hadoop/hadoop-env.sh to the Java path.
      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64/jre
    14. Format the NameNode.
      ./bin/hdfs namenode -format
      The display of Exitting with status 0 indicates a success.
    15. Start Hadoop.
      ./sbin/start-dfs.sh
      NameNode, DataNode and SecondaryNameNode processes will exist upon successful startup.

    Installing Spark

    Download the required version at Spark's official website.
    As Hadoop has already been installed, select Pre-build with user-provided Apache Hadoop here.
    This example also uses the hadoop user for operations.

    1. Decompress and enter the directory.
    2. Modify the configuration file.
      cp ./conf/spark-env.sh.template ./conf/spark-env.sh
      vim ./conf/spark-env.sh
      Add the following in the first line:
      export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
      Modify the path based on the Hadoop installation information.
    3. Run the example.
      bin/run-example SparkPi
      The display of an approximate value of π output by the program indicates a successful installation.

    Was this page helpful?

    Was this page helpful?

    • Not at all
    • Not very helpful
    • Somewhat helpful
    • Very helpful
    • Extremely helpful
    Send Feedback
    Help