tencent cloud

TDMQ for CKafka

Release Notes and Announcements
Release Notes
Broker Release Notes
Announcement
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for CKafka
Strengths
Scenarios
Technology Architecture
Product Series Introduction
Apache Kafka Version Support Description
Comparison with Apache Kafka
High Availability
Use Limits
Regions and AZs
Related Cloud Services
Billing
Billing Overview
Pricing
Billing Example
Changing from Postpaid by Hour to Monthly Subscription
Renewal
Viewing Consumption Details
Overdue Payments
Refund
Getting Started
Guide for Getting Started
Preparations
VPC Network Access
Public Domain Name Access
User Guide
Usage Process Guide
Configuring Account Permission
Creating Instance
Configuring Topic
Connecting Instance
Managing Messages
Managing Consumer Group
Managing Instance
Changing Instance Specification
Configuring Traffic Throttling
Configuring Elastic Scaling Policy
Configuring Advanced Features
Viewing Monitoring Data and Configuring Alarm Rules
Synchronizing Data Using CKafka Connector
Use Cases
Cluster Resource Assessment
Client Practical Tutorial
Log Integration
Open-Source Ecosystem Integration
Replacing Supporting Route (Old)
Migration Guide
Migration Solution Overview
Migrating Cluster Using Open-Source Tool
Troubleshooting
Topics
Clients
Messages
​​API Reference
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK Reference
SDK Overview
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
SDK for Connector
Security and Compliance
Permission Management
Network Security
Deletion Protection
Event Record
CloudAudit
FAQs
Instances
Topics
Consumer Groups
Client-Related
Network-Related
Monitoring
Messages
Agreements
CKafka Service Level Agreements
Contact Us
Glossary

Resetting a Consumption Offset

PDF
Focus Mode
Font Size
Last updated: 2026-01-20 16:52:41
This document introduces the concepts related to and the usage method of the parameter auto.offset.reset.

What Is auto.offset.reset?

auto.offset.reset is a parameter that defines where to start consumption when the offset in a consumption partition cannot be obtained. For example, if the broker does not have an offset (first consumption or offset expired after the validity period of 7 days), the offset needs to be initialized; if an OFFSET_OUT_OF_RANGE error is received, the offset needs to be reset.
The auto.offset.reset options are as follows:
earliest: indicates automatically resetting to the smallest offset in the partition.
latest: indicates automatically resetting to the largest offset in the partition. It is the default value.
none: indicates no automatic resetting. In this case, an OffsetOutOfRangeException error is prompted.

When Will an OFFSET_OUT_OF_RANGE Error Occur?

This error indicates that the offset committed by the client is outside the valid range allowed by the server. For example, given LogStartOffset of 100 and LogEndOffset of 300 in partition 1 of topicA, the server will return the error if the offset committed by the client is less than 100 or greater than 300. In this case, the offset will be reset.
The error will be triggered by the client in the following scenarios:
An offset is set for the client. A message retention period is set for topics, but no messages are consumed in a period. After the retention period ends, the offset is deleted from the server; that is, log rolling has occurred. This error will occur if the deleted offset is committed by the client again at that time.
An abnormal offset is committed by the client due to issues such as SDK bugs or network packet loss.
There are out-of-sync replicas in the server, and a leader switch causes follower replicas to be truncated. This error will occur if the offset committed by the client is outside the truncation range.

Usage Instructions for auto.offset.reset=none

Context

Automatic offset resetting is not expected because large-scale duplicate consumption is not allowed for services.
Note:
An error is reported because no offset is found by the consumer group during the first consumption. In this case, you need to manually set the offset in the catch statement block.

Usage Instructions

After auto.offset.reset is set to none, the issue of automatic offset resetting can be avoided. However, if partitions are added, the client does not know where to start consumption in the new partitions, and thereby, an exception occurs. In this case, the offset needs to be manually set for consumer groups to start consumption.

Usage Method

If a consumer sets auto.offset.reset to none, a NoOffsetForPartitionException exception will be caught during consumption. In this case, you can set the offset in the catch statement block by using one of the following methods based on service scenarios:
Specify the offset, which requires manual maintenance to facilitate retries.
Specify to start consumption from the beginning.
Specify the offset as the latest available offset.
Obtain the offset based on the timestamp and set the offset.
Sample code:
package com.tencent.tcb.operation.ckafka.plain;

import com.google.common.collect.Lists;
import com.tencent.tcb.operation.ckafka.JavaKafkaConfigurer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;

public class KafkaPlainConsumerDemo {

public static void main(String args[]) {
//Set the path to the JAAS configuration file.
JavaKafkaConfigurer.configureSaslPlain();

//Load kafka.properties.
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

Properties props = new Properties();
//Set the access point. You can obtain the access point of the corresponding topic in the console.
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

//Access protocol.
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
//Plain method.
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//Set the maximum allowed interval between two polls.
//The default value is 30 seconds. If a consumer fails to return a heartbeat within the duration, the server determines that the consumer is not alive, removes it from the consumer group, and triggers the rebalance process.
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//Maximum number per poll.
//Be careful that this value cannot be too large. If too much data is polled and cannot be consumed before the next poll, a load balancing will be triggered, resulting in delays.
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//Deserialization method of messages.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//The consumer group to which the current consumption instance belongs. Fill it in after applying in the console.
//Consumption instances of the same group will carry consumption messages.
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

//The position of the consumption offset. Note: In case of auto.offset.reset=none, an error will be reported because no offset is found by the consumer group during the first consumption. In this case, you need to manually set the offset in the catch statement block.
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
//Construct a consumption object, that is, generate a consumer instance.
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//Set the topics subscribed by the consumer group. Multiple topics can be subscribed to.
//If GROUP_ID_CONFIG is the same, it is recommended to set the subscribed topics as the same.
List<String> subscribedTopics = new ArrayList<String>();
//If multiple topics need to be subscribed to, add them here.
//Create each topic in the console first.
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic : topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);
//Consume messages in loops.
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
//The data must be consumed before the next poll, and the total time cannot exceed the value of SESSION_TIMEOUT_MS_CONFIG. It is recommended to open a separate thread pool to consume messages and then return the results asynchronously.
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
}
} catch (NoOffsetForPartitionException e) {
System.out.println(e.getMessage());

//When auto.offset.reset is set to none, you need to catch the exception and manually set the offset. You can choose one of the following methods based on your business requirements.
//Example 1: Specify the offset, which requires manual maintenance to facilitate retries.
Map<Integer, Long> partitionBeginOffsetMap = getPartitionOffset(consumer, topicStr, true);
Map<Integer, Long> partitionEndOffsetMap = getPartitionOffset(consumer, topicStr, false);
consumer.seek(new TopicPartition(topicStr, 0), 0);

//Example 2: Consume from the beginning.
consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(topicStr, 0)));

//Example 3: Specify the offset as the latest available offset.
consumer.seekToEnd(Lists.newArrayList(new TopicPartition(topicStr, 0)));

//Example 4: Set the offset based on the timestamp. For example, reset to the offset used 10 minutes ago.
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
Long value = Instant.now().minus(300, ChronoUnit.SECONDS).toEpochMilli();
timestampsToSearch.put(new TopicPartition(topicStr, 0), value);
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer
.offsetsForTimes(timestampsToSearch);
for (Entry<TopicPartition, OffsetAndTimestamp> entry : topicPartitionOffsetAndTimestampMap
.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndTimestamp entryValue = entry.getValue();
consumer.seek(topicPartition, entryValue.offset()); // Specify the offset, which requires manual maintenance to facilitate retries.
}


}
}
}

/**
* Obtain the earliest or latest offset of topics.
* @param consumer
* @param topicStr
* @param beginOrEnd true begin; false end
* @return
*/
private static Map<Integer, Long> getPartitionOffset(KafkaConsumer<String, String> consumer, String topicStr,
boolean beginOrEnd) {
Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topicStr);
List<TopicPartition> tp = new ArrayList<>();
Map<Integer, Long> map = new HashMap<>();
partitionInfos.forEach(str -> tp.add(new TopicPartition(topicStr, str.partition())));
Map<TopicPartition, Long> topicPartitionLongMap;
if (beginOrEnd) {
topicPartitionLongMap = consumer.beginningOffsets(tp);
} else {
topicPartitionLongMap = consumer.endOffsets(tp);
}
topicPartitionLongMap.forEach((key, beginOffset) -> {
int partition = key.partition();
map.put(partition, beginOffset);
});
return map;
}

}


Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback