This task guides you through how to use the Kafka API after purchasing the CKafka service. After building a CKafka environment on an CVM instance, you need to download and decompress the Kafka toolkit and perform simple testing on the Kafka API.
You need to log in to the CVM purchase page to purchase a CVM instance first. The instance is configured as follows for this test:
After the purchase, install the JDK to the CVM instance by following the steps below:
1.1 Download the JDK
The JDK can be obtained with the
wget command. If you need a different version, go to the official website to download one.
You are recommended to use a JDK version above 1.7. The version used in this example is JDK 1.7.0_79.
1.2 Move it to a folder and decompress it
mkdir /usr/local/jdk mv jdk-7u79-linux-x64.tar.gz /usr/local/jdk/ cd /usr/local/jdk/ tar -xzvf jdk-7u79-linux-x64.tar.gz
1.3 Configure the environment variable
Add the following environment variable configuration to the end of the file:
export JAVA_HOME=/usr/local/jdk/jdk1.7.0_79 (extracted JDK folder) export JRE_HOME=/usr/local/jdk/jdk1.7.0_79/jre export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JRE_HOME/lib
wq to save and exit. Then, run
source /etc/profile to make the file take effect immediately.
Verify whether the environment has been installed successfully by running the following command (or the
javac command) and checking whether the version numbers are the same:
cd $JAVA_HOME/bin ./java -version
$JAVA_HOMEis the home directory of the installed JDK.
If the following codes appear, the JDK has been installed successfully.
$ip $portvariable mentioned below refers to the access IP and port of CKafka.
- A CKafka instance offers up to 6 access points (i.e., $ip $port) which can satisfy the access requests from clients in different network environments. When performing a test, you only need to select the access point corresponding to the specific network environment. For example, if your CVM instance runs in VPC, you can just select the CKafka access point ($ip $port) for VPC. For more information on access points, see the instance details page.
Download the Kafka installation package here and decompress it.
Currently, CKafka is 100% compatible with Kafka 0.9 and 0.10. You are recommended to download the installation package on an appropriate version (preferably v0.10.2).
tar -C /opt -xzvf kafka_2.10-0.10.2.0.tgz // Decompressed to the corresponding directory
Kafka is ready for use immediately after decompression with no environment configuration required.
You can test whether the CVM instance is connected to the CKafka instance by running the
telnet $ip $port
Send a message:
./kafka-console-producer.sh --broker-list $ip:$port --topic topicName This is a message This is another message
Here, the IP in the broker-list is the VIP of the CKafka instance, and topicName is the topic name in the CKafka instance.
Receive a message (CKafka hides the ZooKeeper cluster by default):
./kafka-console-consumer.sh --bootstrap-server $ip:$port --from-beginning --new-consumer --topic topicName This is a message This is another message
In the above command, as no consumer group is specified to consume messages, the system will randomly generate a group to consume messages, and the upper limit on group quantity may be reached quickly. Therefore, you are recommended to specify a group to receive messages. First, you need to configure the specified group name in
consumer.properties, as shown below:
Then, run the following command to specify the consumer group:
./kafka-console-consumer.sh --bootstrap-server $ip:$port --from-beginning --new-consumer --topic topicName --consumer.config ../config/consumer.properties
When configuring the
ConsumerConfigparameter, you are recommended to set
earliestso as to prevent messages from being skipped when a new consumer group does not exist.
Cause: When you create a consumer which falls into a new group and the
latest, latest data (i.e., data produced after creating the consumer) will be consumed, while data previously produced will not be consumed.
View the corresponding CKafka monitor: