tencent cloud

TDMQ for CKafka

Accessing CKafka via Spark Streaming

Download
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-05-11 15:16:49
As an extension of Spark Core, Spark Streaming is used for high-throughput and fault-tolerant processing of continuous data. Currently supported external input sources include Kafka, Flume, HDFS/S3, Kinesis, Twitter, and TCP sockets.

Spark Streaming abstracts continuous data into a Discretized Stream (DStream), which consists of a series of continuous resilient distributed data sets (RDDs). Each RDD contains data generated at a certain time interval. Processing DStream with functions is actually processing these RDDs.

When Spark Streaming is used as data input for Kafka, the following stable and experimental Kafka versions are supported:
Kafka Version
spark-streaming-kafka-0.8
spark-streaming-kafka-0.10
Broker Version
0.8.2.1 or higher
0.10.0 or higher
Api Maturity
Deprecated
Stable
Language Support
Scala, Java, and Python
Scala and Java
Receiver DStream
Yes
No
Direct DStream
Yes
Yes
SSL / TLS Support
No
Yes
Offset Commit Api
No
Yes
Dynamic Topic Subscription
No
Yes
Currently, TDMQ for CKafka (CKafka) is compatible with version 0.9 and later. The Kafka dependency of version 0.10.2.1 is used in this practice scenario.
In addition, Spark Streaming in EMR also supports direct connection to CKafka. For details, see Accessing CKafka via Spark Streaming.

Operation Steps

Step 1: Obtaining the CKafka Instance Access Address

2. In the left sidebar, select Instance List and click the ID of the target instance to go to the basic instance information page.
3. In the Access Method module on the basic instance information page, obtain the instance access address, which is the bootstrap-server required for production and consumption.



Step 2: Creating a Topic

1. On the basic instance information page, select the Topic Management tab at the top.
2. On the topic management page, click New to create a topic named test. This topic is used as an example below to describe how to produce and consume messages.



Step 3: Preparing the CVM Environment

CentOS 6.8 System
package
version
sbt
0.13.16
Hadoop
2.7.3
Spark
2.1.0
protobuf
2.5.0
ssh
Installed on CentOS by default
Java
1.8
For specific installation steps, see Configuring the Environment.

Step 4: Connecting to CKafka

Producing Messages to CKafka
Consuming Messages from CKafka
The Kafka dependency of version 0.10.2.1 is used in this case.
1. Add dependencies to build.sbt:
name := "Producer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.1"
2. Configure producer_example.scala:
import java.util.Properties
import org.apache.kafka.clients.producer._
object ProducerExample extends App {
val props = new Properties()
props.put("bootstrap.servers", "172.16.16.12:9092") //Private IP address and port in the instance information.

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)
val TOPIC="test" //Specify the topic to be produced.
for(i<- 1 to 50){
val record = new ProducerRecord(TOPIC, "key", s"hello $i") //Produce a message with the key being key and the value being hello i.
producer.send(record)
}
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
producer.send(record)
producer.close() //Disconnect at the end.
}
For more information on how to use ProducerRecord, see ProducerRecord.

DirectStream

1. Add dependencies to build.sbt:
name := "Consumer Example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"
2. Configure DirectStream_example.scala:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "172.16.16.12:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream_test1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "false"
)

val sparkConf = new SparkConf()
sparkConf.setMaster("local")
sparkConf.setAppName("Kafka")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topics = Array("spark_test")

val offsets : Map[TopicPartition, Long] = Map()

for (i <- 0 until 3){
val tp = new TopicPartition("spark_test", i)
offsets.updated(tp , 0L)
}
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
println("directStream")
stream.foreachRDD{ rdd=>
//Output the obtained message.
rdd.foreach{iter =>
val i = iter.value
println(s"${i}")
}
//Obtain the offset.
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}

RDD

1. Configure build.sbt in the way as detailed above.Click here to view.
2. Configure RDD_example:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "172.16.16.12:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val sc = new SparkContext("local", "Kafka", new SparkConf())
val java_kafkaParams : java.util.Map[String, Object] = kafkaParams
//Pull messages in the corresponding offset range from the partition in order. The request will be blocked if no messages can be pulled, until the specified waiting time elapses or the number of newly produced messages reaches the number for messages to be pulled.
val offsetRanges = Array[OffsetRange](
OffsetRange("spark_test", 0, 0, 5),
OffsetRange("spark_test", 1, 0, 5),
OffsetRange("spark_test", 2, 0, 5)
)
val range = KafkaUtils.createRDD[String, String](
sc,
java_kafkaParams,
offsetRanges,
PreferConsistent
)
range.foreach(rdd=>println(rdd.value))
sc.stop()
}
}
For more information on how to use kafkaParams, see kafkaParams.

Configuring the Environment

Installing sbt

1. Download the sbt package from the official website of sbt.
2. After decompressing the package, create an sbt_run.sh script with the following content in the sbt directory and add executable permissions as follows:
#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname $0`/bin/sbt-launch.jar "$@"
chmod u+x ./sbt_run.sh
3. Run the following command:
./sbt-run.sh sbt-version
The display of the sbt version indicates a successful installation.

Installing protobuf

1. Download an appropriate version of protobuf.
2. Decompress the package and enter the directory.
./configure
make && make install
You should install gcc-g++ in advance, and the root permissions may be required during installation.
3. Log in again and enter the following in the command line:
protoc --version
4. The display of the protobuf version indicates a successful installation.

Installing Hadoop

1. Download the required version from the official website of Hadoop.
2. Add a Hadoop user.
useradd -m hadoop -s /bin/bash

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)

3. Add administrator permissions.
visudo

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)

4. Add the following in a new line under root ALL=(ALL) ALL: hadoop ALL=(ALL) ALL. Save and exit.
5. Use Hadoop for operations.
su hadoop
6. Configure SSH password-free login.
cd ~/.ssh/ # If the directory does not exist, run ssh localhost once first.
ssh-keygen -t rsa # There will be prompts. Simply press Enter.
cat id_rsa.pub >> authorized_keys # Add authorization.
chmod 600 ./authorized_keys # Modify file permissions.

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)

7. Install Java.
sudo yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel
8. Configure ${JAVA_HOME}.
vim /etc/profile
Add the following at the end:
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64/jre
export PATH=$PATH:$JAVA_HOME

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)

Modify the corresponding path based on the installation information.
9. Decompress Hadoop and enter the directory.
./bin/hadoop version
The display of version information indicates a successful installation.
10. Configure the stand-alone and pseudo-distributed modes (so that you can build different forms of clusters as needed).
vim /etc/profile
Add the following at the end:
export HADOOP_HOME=/usr/local/hadoop
export PATH=$HADOOP_HOME/bin:$PATH

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)

Modify the corresponding path based on the installation information.
11. Modify /etc/hadoop/core-site.xml.
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/usr/local/hadoop/tmp</value>
<description>Abase for other temporary directories.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
12. Modify /etc/hadoop/hdfs-site.xml.
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/data</value>
</property>
</configuration>
13. Modify JAVA_HOME in /etc/hadoop/hadoop-env.sh to the path of Java.
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64/jre

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)

14. Format the NameNode.
./bin/hdfs namenode -format
A successful operation is indicated by the display of Exiting with status 0.
15. Start Hadoop.
./sbin/start-dfs.sh
NameNode, DataNode, and SecondaryNameNode processes will exist upon successful startup.

Installing Spark

Visit the Spark website to download the required version. Since Hadoop is already installed, select the Pre-built with user-provided Apache Hadoop option.
Note
This example also uses the Hadoop user for operations.
1. Decompress the package and enter the directory.
2. Modify the configuration file.
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
vim ./conf/spark-env.sh
Add the following in the first line:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

Running Environment

Operating System: Ubuntu 24.04.3 LTS / x86_64

Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)

Modify the path according to your Hadoop installation.
3. Run the example.
bin/run-example SparkPi
The display of an approximate value of π output by the program indicates a successful installation.

도움말 및 지원

문제 해결에 도움이 되었나요?

피드백