package com.tencent.tcb.operation.ckafka.plain;import com.google.common.collect.Lists;import com.tencent.tcb.operation.ckafka.JavaKafkaConfigurer;import java.time.Instant;import java.time.temporal.ChronoUnit;import java.util.ArrayList;import java.util.Collection;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.Properties;import org.apache.kafka.clients.CommonClientConfigs;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;import org.apache.kafka.clients.consumer.OffsetAndTimestamp;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.config.SaslConfigs;public class KafkaPlainConsumerDemo {public static void main(String args[]) {//Set the path to the JAAS configuration file.JavaKafkaConfigurer.configureSaslPlain();//Load kafka.properties.Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();Properties props = new Properties();//Set the access point. You can obtain the access point of the corresponding topic in the console.props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));//Access protocol.props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");//Plain method.props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");//Set the maximum allowed interval between two polls.//The default value is 30 seconds. If a consumer fails to return a heartbeat within the duration, the server determines that the consumer is not alive, removes it from the consumer group, and triggers the rebalance process.props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);//Maximum number per poll.//Be careful that this value cannot be too large. If too much data is polled and cannot be consumed before the next poll, a load balancing will be triggered, resulting in delays.props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);//Deserialization method of messages.props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//The consumer group to which the current consumption instance belongs. Fill it in after applying in the console.//Consumption instances of the same group will carry consumption messages.props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));//The position of the consumption offset. Note: In case of auto.offset.reset=none, an error will be reported because no offset is found by the consumer group during the first consumption. In this case, you need to manually set the offset in the catch statement block.props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");//Construct a consumption object, that is, generate a consumer instance.KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//Set the topics subscribed by the consumer group. Multiple topics can be subscribed to.//If GROUP_ID_CONFIG is the same, it is recommended to set the subscribed topics as the same.List<String> subscribedTopics = new ArrayList<String>();//If multiple topics need to be subscribed to, add them here.//Create each topic in the console first.String topicStr = kafkaProperties.getProperty("topic");String[] topics = topicStr.split(",");for (String topic : topics) {subscribedTopics.add(topic.trim());}consumer.subscribe(subscribedTopics);//Consume messages in loops.while (true) {try {ConsumerRecords<String, String> records = consumer.poll(1000);//The data must be consumed before the next poll, and the total time cannot exceed the value of SESSION_TIMEOUT_MS_CONFIG. It is recommended to open a separate thread pool to consume messages and then return the results asynchronously.for (ConsumerRecord<String, String> record : records) {System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));}} catch (NoOffsetForPartitionException e) {System.out.println(e.getMessage());//When auto.offset.reset is set to none, you need to catch the exception and manually set the offset. You can choose one of the following methods based on your business requirements.//Example 1: Specify the offset, which requires manual maintenance to facilitate retries.Map<Integer, Long> partitionBeginOffsetMap = getPartitionOffset(consumer, topicStr, true);Map<Integer, Long> partitionEndOffsetMap = getPartitionOffset(consumer, topicStr, false);consumer.seek(new TopicPartition(topicStr, 0), 0);//Example 2: Consume from the beginning.consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(topicStr, 0)));//Example 3: Specify the offset as the latest available offset.consumer.seekToEnd(Lists.newArrayList(new TopicPartition(topicStr, 0)));//Example 4: Set the offset based on the timestamp. For example, reset to the offset used 10 minutes ago.Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();Long value = Instant.now().minus(300, ChronoUnit.SECONDS).toEpochMilli();timestampsToSearch.put(new TopicPartition(topicStr, 0), value);Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampsToSearch);for (Entry<TopicPartition, OffsetAndTimestamp> entry : topicPartitionOffsetAndTimestampMap.entrySet()) {TopicPartition topicPartition = entry.getKey();OffsetAndTimestamp entryValue = entry.getValue();consumer.seek(topicPartition, entryValue.offset()); // Specify the offset, which requires manual maintenance to facilitate retries.}}}}/*** Obtain the earliest or latest offset of topics.* @param consumer* @param topicStr* @param beginOrEnd true begin; false end* @return*/private static Map<Integer, Long> getPartitionOffset(KafkaConsumer<String, String> consumer, String topicStr,boolean beginOrEnd) {Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topicStr);List<TopicPartition> tp = new ArrayList<>();Map<Integer, Long> map = new HashMap<>();partitionInfos.forEach(str -> tp.add(new TopicPartition(topicStr, str.partition())));Map<TopicPartition, Long> topicPartitionLongMap;if (beginOrEnd) {topicPartitionLongMap = consumer.beginningOffsets(tp);} else {topicPartitionLongMap = consumer.endOffsets(tp);}topicPartitionLongMap.forEach((key, beginOffset) -> {int partition = key.partition();map.put(partition, beginOffset);});return map;}}
Feedback