Kafka's transaction feature is designed to support atomic operations in a distributed environment. It allows producers to underwrite message integrity and consistency when sending messages, especially in scenes where multiple messages must be handled as a whole. The following is the main concept and introduction to the features of Kafka transactions.
Transactional Concepts
Basic Concept of Transactions
Atomicity: All operations in a transaction are either all successful or all failed. Kafka ensures messages sent in a transaction are either successfully written in the topic or do not write.
Consistency: The data state should remain consistent before and after transaction execution.
Isolation: Operations between transactions are independent of each other. The execution of one transaction shall not affect the execution of other transactions.
Durability: Once a transaction is submitted, its result is permanent and will not be lost even if a system crash occurs.
Transaction Workflow
The Kafka transaction workflow mainly includes the following steps:
1. Start transaction: The producer calls the initTransactions() method to initialize the transaction before sending a message.
2. Send message: The producer can send multiple messages to one or more topics, and these messages will be marked with transactional.
3. Submit or terminate transaction:
Commit transaction: If all messages are sent successfully, the producer calls the commitTransaction() method to commit the transaction, and all messages will be written into Kafka.
Terminate transaction: If an error occurs during sending, the producer can call the abortTransaction() method to terminate the transaction, and no messages will be written.
Configuring Transactions
To use the transaction feature in Kafka, you need to set the following parameters in the producer configuration.
transactional.id: Each transactional producer requires a unique identifier. This ID is used to identify all messages in the transaction.
acks: Set to all to ensure all replicas acknowledge the message.
enable.idempotence: Set to true to enable idempotence and ensure messages are not repeatedly sent.
Transactional Limitations
Performance overhead: Using transactions introduces additional performance overhead because it needs more coordination and confirmation.
Transaction timeout: Kafka enforces a timeout limit on transactions, by default 60 seconds. If a transaction is unsubmitted or terminated here in, it will be automatically terminated.
Consumer handling: When processing transactional messages, consumers need to pay attention that they can only see these messages after transaction commit.
Transaction Usage Example
producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class TransactionalProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
Create a Kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Sent message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), metadata.partition(), metadata.offset());
}
producer.commitTransaction();
System.out.println("Transaction committed successfully.");
} catch (Exception e) {
producer.abortTransaction();
System.err.println("Transaction aborted due to an error: " + e.getMessage());
} finally {
producer.close();
}
}
}
consumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class TransactionalConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
Create a Kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
Kafka Transaction Management
In Kafka, transaction management is involved in multiple components and data structures to ensure atomicity and consistency. Memory usage of transaction information mainly relates to the following aspects:
Transaction ID and Producer ID
Transaction ID: Each transaction has a unique transaction ID used to identify it. The transaction ID is designated by the producer when sending a message, usually as a string.
Producer ID: Each producer will be allocated a unique Producer ID when connecting to Kafka. This ID is used to identify the producer's messages and ensure the order and idempotence of messages.
Note:
Warning: High-frequency initialization of a large number of transactional Producer IDs may cause memory overflow and result in server overload, impacting stability.
Transaction Status Management
Kafka uses an internal topic called transaction status log to manage the state of transactions. This log records the status of each transaction (such as in progress, submitted, suspended) and messages related to the transaction. Transaction status log management involves the following aspects:
In-memory data structure: Kafka maintains a data structure (such as a hash table or map) in memory, used to store current active transaction information. This information includes transaction ID, Producer ID, transaction status, timestamp, etc.
Persistent storage: The transaction status log will be persisted to disk to ensure transaction status can be restored when the Kafka server restarts or recovers from a failure.
Memory Usage of Transaction Information
Memory usage of transaction information mainly depends on two factors:
Number of active transactions: The number of currently ongoing transactions directly affects memory usage. Each active transaction occupies a certain space in memory.
Transaction metadata: The metadata of each transaction (such as transaction ID, Producer ID, status) also consumes memory. The specific memory consumed depends on the data size of this metadata.
Transaction Cleanup
To prevent extremely high memory usage, Kafka periodically checks and cleans up completed transactions based on the configured expiry date, with a default reservation of 7 days before deletion upon expiry.
Common Transactional FullGC/OOM Issues
From transaction management, you can see that transaction information can occupy a large amount of memory. Among them, the two most direct factors impacting the amount of memory used are: the number of transaction IDs and the number of Producer IDs.
The number of transaction IDs refers to the number of transactions initialized and committed by clients to the Broker, which is closely related to the commit frequency of newly-added transactions by clients.
Producer ID refers to the Producer status information stored in each Topic partition of the Broker, so the number of Producer IDs is closely related to the number of partitions in the Broker.
In transaction scenarios, transaction IDs and Producer IDs are strongly bound. If the same Producer ID bound to a transaction ID sends messages to all partitions in the Broker, then the maximum number of Producer IDs in a Broker can theoretically reach the product of the number of transaction IDs and the number of partitions in the Broker. Assuming the number of transaction IDs in an instance is t and the number of partitions in a Broker is p, the maximum number of Producer IDs can reach t * p.
Note:
Therefore, if the number of transaction IDs in a Broker is t, the average memory occupancy size per transaction is tb, the number of partitions in a Broker is p, and the average size occupied by a Producer ID is pb, then the memory size occupied by transaction information in the Broker is: t * tb + t * p * pb.
You can see two scenarios that may lead to a surge in memory usage.
Clients frequently submit new transaction IDs during instance initialization.
A single transaction ID sending data to multiple partitions can cause the Producer ID cross-product count to soar super fast, potentially filling up memory very easily.
Note:
Therefore, for both the Flink client and self-implemented transactional producers, try to avoid these two scenarios. For example, for Flink, you can reduce the checkpoint frequency to lower how often the transaction ID changes due to the computation of the prefix and random string. Additionally, try to ensure data with the same transaction ID is sent to the same partition.
Flink Transaction Usage Notes
Flink provides the following optimization methods to ensure transaction information does not bloat rapidly:
Client optimization parameters: Flink increases the checkpoint interval (for more information, see community ISSUE). Flink production task can optimize sink.partitioner to Fixed mode.