tencent cloud

VPC Network Access
Last updated: 2026-01-05 15:16:58
VPC Network Access
Last updated: 2026-01-05 15:16:58

Scenarios

This document describes how to access TDMQ for CKafka (CKafka) and send and receive messages by calling the Java SDK in the Virtual Private Cloud (VPC) network, helping you better understand the complete process of sending and receiving messages.

Prerequisites

You have obtained the client connection parameters as instructed in SDK Overview.

Operation Steps

Step 1: Preparing for Configurations

1. Upload javakafkademo in the downloaded demo to the Linux server.
2. Log in to the Linux server, go to the javakafkademo directory, and configure related parameters.
2.1 Add the following dependencies to pom.xml.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
2.2 Create the TDMQ for CKafka (CKafka) configuration file kafka.properties.
## Configure the access network and copy the network information from the Network column of the Access Method module on the instance details page in the console.
bootstrap.servers=xx.xx.xx.xx:xxxx
## Configure a topic and copy the topic information on the topic management page in the console.
topic=XXX
## Configure a consumer group. You can customize the settings.
group.id=XXX
Parameter
Description
bootstrap.servers
Access network. On the Basic Info page of the instance in the console, select the Access Mode module and copy the network information from the Network column.
topic
Topic name. Copy the name on the Topic List page in the console.
group.id
You can define the name and see the consumer on the Consumer Group page after successful demo running.
3. Create the configuration file loader CKafkaConfigurer.java.
public class CKafkaConfigurer {

private static Properties properties;

public synchronized static Properties getCKafkaProperties() {
if (null != properties) {
return properties;
}
//Obtain the content of the configuration file kafka.properties.
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(CKafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
} catch (Exception e) {
System.out.println("getCKafkaProperties error");
}
properties = kafkaProperties;
return kafkaProperties;
}
}

Step 2: Sending Messages

1. Write the message production program CKafkaProducerDemo.java.
public class CKafkaProducerDemo {

public static void main(String args[]) {
//Load kafka.properties.
Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();

Properties properties = new Properties();
//Set the access point. You can obtain the access point of the corresponding topic in the console.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

//The serialization method of Kafka messages. StringSerializer is used by the demo.
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//The maximum request waiting time.
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
//Set the number of internal retries on the client.
properties.put(ProducerConfig.RETRIES_CONFIG, 5);
//Set the interval of internal retries on the client.
properties.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
//Construct a producer object.
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

//Construct a Kafka message.
String topic = kafkaProperties.getProperty(“topic”); //The topic to which the message belongs. Fill it in here after applying in the console.
String value =this is ckafka msg value”; //The content of the message.

try {
//Obtaining Future objects in batches can speed up the process. Be careful not to use too large a batch.
List<Future<RecordMetadata>> futureList = new ArrayList<>(128);
for (int i = 0; i < 10; i++) {
//Send a message and obtain a Future object.
ProducerRecord<String, String> kafkaMsg = new ProducerRecord<>(topic,
value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMsg);
futureList.add(metadataFuture);

}
producer.flush();
for (Future<RecordMetadata> future : futureList) {
//Synchronously obtain the result of the Future object.
RecordMetadata recordMetadata = future.get();
System.out.println("produce send ok: " + recordMetadata.toString());
}
} catch (Exception e) {
//After the client retries internally, the sending still fails. The service needs to respond to this type of error.
System.out.println("error occurred");
}
}
}
2. Compile and run CKafkaProducerDemo.java to send messages.
3. Running results.
Produce ok:ckafka-topic-demo-0@198
Produce ok:ckafka-topic-demo-0@199
4. On the Topic List page in the CKafka console, select the target topic, and choose More > Message Query to view the message just sent.

Step 3: Consuming Messages

1. Create a consumer message subscription program CKafkaConsumerDemo.java.
public class CKafkaConsumerDemo {

public static void main(String args[]) {
//Load kafka.properties.
Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();

Properties props = new Properties();
//Set the access point. You can obtain the access point of the corresponding topic in the console.
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//The maximum interval allowed for heartbeat detection.
//The default value is 30 seconds. If a consumer fails to return a heartbeat within the duration, the server determines that the consumer is not alive, removes it from the consumer group, and triggers the rebalance process.
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//Maximum number per poll.
//Be careful that this value cannot be too large. If too much data is polled and cannot be consumed before the next poll, a load balancing will be triggered, resulting in delays.
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//Deserialization method of messages.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//Consumption instances of the same group will carry consumption messages.
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
//Construct a consumption object, that is, generate a consumer instance.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//Set the topics subscribed by the consumer group. Multiple topics can be subscribed to.
//If GROUP_ID_CONFIG is the same, it is recommended to set the subscribed topics as the same.
List<String> subscribedTopics = new ArrayList<>();
//If multiple topics need to be subscribed to, add them here.
//Create each topic in the console first.
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic : topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);

//Consume messages in loops.
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
//The data must be consumed before the next poll, and the total time cannot exceed the value of MAX.POLL.INTERVAL.MS. The default value of this parameter is 300 seconds.
//It is recommended to open a separate thread pool to consume messages and then return the results asynchronously.
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
}
} catch (Exception e) {
System.out.println("consumer error!");
}
}
}
}
2. Compile and run CKafkaConsumerDemo.java to consume messages.
3. Running results.
Consume partition:0 offset:298
Consume partition:0 offset:299
4. On the Consumer Group page in the CKafka console, select the target consumer group name, enter the topic name in the Topic Name area, and click View Details to view consumption details.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback