This task guides you through how to use the Kafka API after purchasing the CKafka service. After building a CKafka environment on an CVM instance, you need to download and decompress the Kafka toolkit and perform simple testing on the Kafka API.
You need to log in to the CVM purchase page to purchase a CVM instance first. The instance is configured as follows for this test:
After the purchase, install the JDK to the CVM instance by following the steps below:
1.1 Download the JDK
The JDK can be obtained with the wget
command. If you need a different version, go to the official website to download one.
You are recommended to use a JDK version above 1.7. The version used in this example is JDK 1.7.0_79.
1.2 Move it to a folder and decompress it
mkdir /usr/local/jdk
mv jdk-7u79-linux-x64.tar.gz /usr/local/jdk/
cd /usr/local/jdk/
tar -xzvf jdk-7u79-linux-x64.tar.gz
1.3 Configure the environment variable
vim /etc/profile
Add the following environment variable configuration to the end of the file:
export JAVA_HOME=/usr/local/jdk/jdk1.7.0_79 (extracted JDK folder)
export JRE_HOME=/usr/local/jdk/jdk1.7.0_79/jre
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JRE_HOME/lib
Run wq
to save and exit. Then, run source /etc/profile
to make the file take effect immediately.
1.4 Verify
Verify whether the environment has been installed successfully by running the following command (or the javac
command) and checking whether the version numbers are the same:
cd $JAVA_HOME/bin
./java -version
$JAVA_HOME
is the home directory of the installed JDK.
If the following codes appear, the JDK has been installed successfully.
- The
$ip $port
variable mentioned below refers to the access IP and port of CKafka.- A CKafka instance offers up to 6 access points (i.e., $ip $port) which can satisfy the access requests from clients in different network environments. When performing a test, you only need to select the access point corresponding to the specific network environment. For example, if your CVM instance runs in VPC, you can just select the CKafka access point ($ip $port) for VPC. For more information on access points, see the instance details page.
Download the Kafka installation package here and decompress it.
Currently, CKafka is 100% compatible with Kafka 0.9 and 0.10. You are recommended to download the installation package on an appropriate version (preferably v0.10.2).
tar -C /opt -xzvf kafka_2.10-0.10.2.0.tgz // Decompressed to the corresponding directory
Kafka is ready for use immediately after decompression with no environment configuration required.
You can test whether the CVM instance is connected to the CKafka instance by running the telnet
command.
telnet $ip $port
Send a message:
./kafka-console-producer.sh --broker-list $ip:$port --topic topicName
This is a message
This is another message
Here, the IP in the broker-list is the VIP of the CKafka instance, and topicName is the topic name in the CKafka instance.
Receive a message (CKafka hides the ZooKeeper cluster by default):
./kafka-console-consumer.sh --bootstrap-server $ip:$port --from-beginning --new-consumer --topic topicName
This is a message
This is another message
In the above command, as no consumer group is specified to consume messages, the system will randomly generate a group to consume messages, and the upper limit on group quantity may be reached quickly. Therefore, you are recommended to specify a group to receive messages. First, you need to configure the specified group name in consumer.properties
, as shown below:
Then, run the following command to specify the consumer group:
./kafka-console-consumer.sh --bootstrap-server $ip:$port --from-beginning --new-consumer --topic topicName --consumer.config ../config/consumer.properties
When configuring the
ConsumerConfig
parameter, you are recommended to setauto.offset.reset
toearliest
so as to prevent messages from being skipped when a new consumer group does not exist.
Cause: When you create a consumer which falls into a new group and theauto.offset.reset
value islatest
, latest data (i.e., data produced after creating the consumer) will be consumed, while data previously produced will not be consumed.
View the corresponding CKafka monitor:
Was this page helpful?