tencent cloud

Feedback

Using SDK to Receive/Send Message (Recommended)

Last updated: 2022-05-26 14:12:35

    Overview

    This document describes how to access CKafka to send/receive messages with the SDK for Java over the public network.

    If you need to use SDKs for other languages, see SDK Documentation.

    Prerequisites

    Directions

    Step 1. Prepare configurations

    1. Decompress the downloaded demo and enter the PUBLIC_SASL directory under javakafkademo.
    2. Modify the JAAS configuration file ckafka_client_jaas.conf.
      KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="yourinstance#yourusername"
      password="yourpassword";
      };
      Note

      Set username to a value in the format of instance ID + # + configured username, and password to a configured password.

    3. Modify the Kafka configuration file kafka.properties.
      ## Configure the accessed network by copying the information in the Network column in the Access Mode section on the instance details page in the console
      bootstrap.servers=ckafka-xxxxxxx
      ## Configure the topic by copying the information on the Topic Management page in the console
      topic=XXX
      ## Configure the consumer group as needed
      group.id=XXX
      ## Path of the JAAS configuration file `ckafka_client_jaas.conf`.
      java.security.auth.login.config.plain=/xxxx/ckafka_client_jaas.conf
      Parameter Description
      bootstrap.servers Accessed network, which can be copied in the Network column in the Access Mode section on the instance details page in the console.
      topic Topic name, which can be copied from the Topic Management page in the console.
      group.id You can customize it. After the demo runs successfully, you can see the consumer group on the Consumer Group page.
      java.security.auth.login.config.plain Enter the path of the JAAS configuration file `ckafka_client_jaas.conf`.

    Step 2. Send a message

    1. Compile and run the message sending program KafkaSaslProducerDemo.java.
      public class KafkaSaslProducerDemo {

      public static void main(String args[])
      {
      // Set the path of the JAAS configuration file.
      CKafkaConfigurer.configureSaslPlain();

      // Load `kafka.properties`.
      Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();

      Properties props = new Properties();
      // Set the access point. Get the access point of the corresponding topic in the console.
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

      // Set the access protocol.
      props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
      // Set the PLAIN mechanism.
      props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

      // Set the method for serializing Kafka messages.
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
      // Set the maximum request wait time.
      props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
      // Set the number of retries for the client.
      props.put(ProducerConfig.RETRIES_CONFIG, 5);
      // Set the internal retry interval for the client.
      props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
      // Construct a producer object. Note: A producer object is thread-safe, and generally one Producer object is sufficient for a process.
      KafkaProducer<string, string=""> producer = new KafkaProducer<>(props);

      // Construct a CKafka message.
      String topic = kafkaProperties.getProperty("topic"); // Topic of the message. Enter the topic you created in the console
      String value = "this is ckafka msg value"; // Message content

      try {
      // Batch getting future objects can speed up the process. Note that the batch size should not be too large.
      List<future<recordmetadata>> futures = new ArrayList<>(128);
      for (int i =0; i < 100; i++)
      {
      // Send the message and get a future object.
      ProducerRecord<string, string=""> kafkaMessage = new ProducerRecord<>(topic, value + ": " + i);
      Future<recordmetadata> metadataFuture = producer.send(kafkaMessage);
      futures.add(metadataFuture);

      }
      producer.flush();
      for (Future<recordmetadata> future: futures)
      {
      // Sync the future object obtained.
      RecordMetadata recordMetadata = future.get();
      System.out.println("Produce ok:" + recordMetadata.toString());
      }
      } catch (Exception e) {
      // If the sending still fails after client internal retries, the system needs to report and handle the error.
      System.out.println("error occurred");
      }
      }
      }
    2. View the execution result (output).
      Produce ok:ckafka-topic-demo-0@198
      Produce ok:ckafka-topic-demo-0@199
    3. On the Topic Management tab on the instance details page in the CKafka console, select the target topic, and click More > Message Query to view the message just sent.

    Step 3. Consume the message

    1. Compile and run the message subscribing program KafkaSaslConsumerDemo.java.
      public class KafkaSaslConsumerDemo {
      public static void main(String args[])
      {
      // Set the path of the JAAS configuration file.
      CKafkaConfigurer.configureSaslPlain();

      // Load `kafka.properties`.
      Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();

      Properties props = new Properties();
      // Set the access point. Get the access point of the corresponding topic in the console.
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

      // Set the access protocol.
      props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
      // Set the PLAIN mechanism.
      props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
      // Set the maximum interval between two polls.
      // If the consumer does not return a heartbeat message within the interval, the broker will determine that the consumer is not alive, and then remove the consumer from the consumer group and trigger rebalancing. The default value is 30s.
      props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
      // Set the maximum number of messages that can be polled at a time.
      // Do not set this parameter to an excessively large value. If polled messages are not all consumed before the next poll starts, load balancing is triggered and lagging occurs.
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
      // Set the method for deserializing messages.
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      // Set the consumer group of the current consumer instance after you apply for one in the console.
      // The instances in the same consumer group consume messages in load balancing mode.
      props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
      // Construct a consumer object. This generates a consumer instance.
      KafkaConsumer<string, string=""> consumer = new KafkaConsumer<string, string="">(props);
      // Set one or more topics to which the consumer group subscribes.
      // We recommend you configure consumer instances with the same `GROUP_ID_CONFIG` value to subscribe to the same topics.
      List<string> subscribedTopics = new ArrayList<string>();
      // If you want to subscribe to multiple topics, add the topics here.
      // You must create the topics in the console in advance.
      String topicStr = kafkaProperties.getProperty("topic");
      String[] topics = topicStr.split(",");
      for (String topic: topics)
      {
      subscribedTopics.add(topic.trim());
      }
      consumer.subscribe(subscribedTopics);

      // Consume messages in loop.
      while (true){
      try {
      ConsumerRecords<string, string=""> records = consumer.poll(1000);
      // All messages must be consumed before the next poll, and the total duration cannot exceed the timeout interval specified by `SESSION_TIMEOUT_MS_CONFIG`.
      for (ConsumerRecord<string, string=""> record : records) {
      System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
      }
      } catch (Exception e) {
      System.out.println("consumer error!");
      }
      }
      }
      }
    2. View the execution result.
      Consume partition:0 offset:298
      Consume partition:0 offset:299
    3. On the Consumer Group tab in the CKafka console, select the consumer group name, enter the topic name, and click View Details to view the consumption details.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support