As an extension of Spark Core, Spark Streaming is used for high-throughput and fault-tolerant processing of continuous data. Currently supported external input sources include Kafka, Flume, HDFS/S3, Kinesis, Twitter, and TCP socket.
Spark Streaming abstracts continuous data into a Discretized Stream (DStream), which consists of a series of continuous resilient distributed datasets (RDDs). Each RDD contains data generated at a certain time interval. Processing DStream with functions is actually processing these RDDs.
When Spark Streaming is used as data input for Kafka, the following stable and experimental Kafka versions are supported:
Kafka Version | spark-streaming-kafka-0.8 | spark-streaming-kafka-0.10 |
---|---|---|
Broker version | 0.8.2.1 or higher | 0.10.0 or higher |
API stability | Stable | Experimental |
Language support | Scala, Java, Python | Scala, Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS support | No | Yes |
Offset commit API | No | Yes |
Dynamic topic subscription | No | Yes |
Currently, the following versions of CKafka are supported: 0.9.0.x, 0.10.0.x, 0.10.1.x, and 0.10.2.x. The Kafka dependency of version 0.10.2.1 is used in this practice scenario.
In addition, Spark Streaming in EMR also supports direct connection to CKafka. For more information, please see Spark Streaming Connection to Ckafka.
Log in to the CKafka Console and create a CKafka instance as instructed in Creating Instances.
Confirm whether the selected network type matches the currently used network.
Create a topic under the instance as instructed in Creating Topics.
Private IP and port: the bootstrap-server to be used for production and consumption.
A topic named spark_test
is created here and used as an example to describe how to produce and consume messages.
Centos 6.8 OS
Package | Version |
---|---|
sbt | 0.13.16 |
Hadoop | 2.7.3 |
Spark | 2.1.0 |
Protobuf | 2.5.0 |
SSH | Installed on CentOS by default |
Java | 1.8 |
Currently, the following versions of CKafka are supported: 0.9.0.x, 0.10.0.x, 0.10.1.x, and 0.10.2.x. The Kafka dependency of version 0.10.2.1 is used here.
Add dependencies to build.sbt
:
name := "Producer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.1"
Configure producer_example.scala
:
import java.util.Properties
import org.apache.kafka.clients.producer._
object ProducerExample extends App {
val props = new Properties()
props.put("bootstrap.servers", "172.16.16.12:9092") // Private IP and port in the instance information
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC="test" // Specify the topic to produce to
for(i<- 1 to 50){
val record = new ProducerRecord(TOPIC, "key", s"hello $i") // Produce a message whose `key` is "key" and `value` is "hello i"
producer.send(record)
}
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
producer.send(record)
producer.close() // Disconnect at the end
}
For more information on how to use ProducerRecord
, please see ProducerRecord.
Add dependencies to build.sbt
:
name := "Consumer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"
Configure DirectStream_example.scala
:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "172.16.16.12:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream_test1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "false"
)
val sparkConf = new SparkConf()
sparkConf.setMaster("local")
sparkConf.setAppName("Kafka")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topics = Array("spark_test")
val offsets : Map[TopicPartition, Long] = Map()
for (i <- 0 until 3){
val tp = new TopicPartition("spark_test", i)
offsets.updated(tp , 0L)
}
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
println("directStream")
stream.foreachRDD{ rdd=>
// Output the obtained message
rdd.foreach{iter =>
val i = iter.value
println(s"${i}")
}
// Get the offset
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
build.sbt
in the way as detailed here.RDD_example
:import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "172.16.16.12:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val sc = new SparkContext("local", "Kafka", new SparkConf())
val java_kafkaParams : java.util.Map[String, Object] = kafkaParams
// Pull messages in the corresponding offset range from the partition in order. The request will be blocked if no messages can be pulled, until the specified waiting time elapses or the number of produced new messages reaches the number for messages to be pulled
val offsetRanges = Array[OffsetRange](
OffsetRange("spark_test", 0, 0, 5),
OffsetRange("spark_test", 1, 0, 5),
OffsetRange("spark_test", 2, 0, 5)
)
val range = KafkaUtils.createRDD[String, String](
sc,
java_kafkaParams,
offsetRanges,
PreferConsistent
)
range.foreach(rdd=>println(rdd.value))
sc.stop()
}
}
For more information on how to use kafkaParams
, please see kafkaParams.Download the sbt package from sbt's official website.
After decompression, create an sbt_run.sh
script with the following content in the sbt directory and add executable permissions:
#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname $0`/bin/sbt-launch.jar "$@"
chmod u+x ./sbt_run.sh
Run the following command:
./sbt-run.sh sbt-version
The display of sbt version indicates a successful installation.
./configure
make && make install
You should install gcc-g++ in advance, and the root permission may be required during installation.protoc --version
useradd -m hadoop -s /bin/bash
visudo
root ALL=(ALL) ALL
:hadoop ALL=(ALL) ALL
su hadoop
cd ~/.ssh/ # If there is no such directory, please run `ssh localhost` first
ssh-keygen -t rsa # There will be prompts. Simply press Enter
cat id_rsa.pub >> authorized_keys # Add authorization
chmod 600 ./authorized_keys # Modify file permission
sudo yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel
${JAVA_HOME}
.vim /etc/profile
Add the following at the end:export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64/jre
export PATH=$PATH:$JAVA_HOME
Modify the corresponding path based on the installation information../bin/hadoop version
The display of version information indicates a successful installation.vim /etc/profile
Add the following at the end:export HADOOP_HOME=/usr/local/hadoop
export PATH=$HADOOP_HOME/bin:$PATH
Modify the corresponding path based on the installation information./etc/hadoop/core-site.xml
.<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/usr/local/hadoop/tmp</value>
<description>Abase for other temporary directories.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
/etc/hadoop/hdfs-site.xml
.<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/data</value>
</property>
</configuration>
JAVA_HOME
in /etc/hadoop/hadoop-env.sh
to the Java path.export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64/jre
./bin/hdfs namenode -format
The display of Exitting with status 0
indicates a success../sbin/start-dfs.sh
NameNode
, DataNode
and SecondaryNameNode
processes will exist upon successful startup.Download the required version at Spark's official website.
As Hadoop has already been installed, select Pre-build with user-provided Apache Hadoop here.
This example also uses the hadoop
user for operations.
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
vim ./conf/spark-env.sh
Add the following in the first line:export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
Modify the path based on the Hadoop installation information.bin/run-example SparkPi
The display of an approximate value of π
output by the program indicates a successful installation.
Was this page helpful?