tencent cloud

Configuring Transactional Messages
Last updated: 2025-09-19 15:53:57
Configuring Transactional Messages
Last updated: 2025-09-19 15:53:57
Kafka's transaction feature is designed to support atomic operations in a distributed environment. It allows producers to underwrite message integrity and consistency when sending messages, especially in scenes where multiple messages must be handled as a whole. The following is the main concept and introduction to the features of Kafka transactions.

Transactional Concepts

Basic Concept of Transactions

Atomicity: All operations in a transaction are either all successful or all failed. Kafka ensures messages sent in a transaction are either successfully written in the topic or do not write.
Consistency: The data state should remain consistent before and after transaction execution.
Isolation: Operations between transactions are independent of each other. The execution of one transaction shall not affect the execution of other transactions.
Durability: Once a transaction is submitted, its result is permanent and will not be lost even if a system crash occurs.

Transaction Workflow

The Kafka transaction workflow mainly includes the following steps:
1. Start transaction: The producer calls the initTransactions() method to initialize the transaction before sending a message.
2. Send message: The producer can send multiple messages to one or more topics, and these messages will be marked with transactional.
3. Submit or terminate transaction:
Commit transaction: If all messages are sent successfully, the producer calls the commitTransaction() method to commit the transaction, and all messages will be written into Kafka.
Terminate transaction: If an error occurs during sending, the producer can call the abortTransaction() method to terminate the transaction, and no messages will be written.

Configuring Transactions

To use the transaction feature in Kafka, you need to set the following parameters in the producer configuration.
transactional.id: Each transactional producer requires a unique identifier. This ID is used to identify all messages in the transaction.
acks: Set to all to ensure all replicas acknowledge the message.
enable.idempotence: Set to true to enable idempotence and ensure messages are not repeatedly sent.

Transactional Limitations

Performance overhead: Using transactions introduces additional performance overhead because it needs more coordination and confirmation.
Transaction timeout: Kafka enforces a timeout limit on transactions, by default 60 seconds. If a transaction is unsubmitted or terminated here in, it will be automatically terminated.
Consumer handling: When processing transactional messages, consumers need to pay attention that they can only see these messages after transaction commit.

Transaction Usage Example

producer

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class TransactionalProducerDemo {
public static void main(String[] args) {
// Kafka configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka broker address
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id"); // transaction ID
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // Enable idempotence

Create a Kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Initialize transaction
producer.initTransactions();

try {
// Start transaction
producer.beginTransaction();

// Send the message.
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
RecordMetadata metadata = producer.send(record).get(); // Send message and wait for confirmation
System.out.printf("Sent message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), metadata.partition(), metadata.offset());
}

// Commit transaction
producer.commitTransaction();
System.out.println("Transaction committed successfully.");
} catch (Exception e) {
// If an exception occurs, rollback transaction
producer.abortTransaction();
System.err.println("Transaction aborted due to an error: " + e.getMessage());
} finally {
// Close the producer.
producer.close();
}
}
}

consumer

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class TransactionalConsumerDemo {
public static void main(String[] args) {
// Kafka configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka broker address
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // consumer group ID
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // Only read submitted transaction messages

Create a Kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to a topic
consumer.subscribe(Collections.singletonList("my-topic"));

try {
while (true) {
// Pull messages.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// Stop consumer
consumer.close();
}
}
}

Kafka Transaction Management

In Kafka, transaction management is involved in multiple components and data structures to ensure atomicity and consistency. Memory usage of transaction information mainly relates to the following aspects:

Transaction ID and Producer ID

Transaction ID: Each transaction has a unique transaction ID used to identify it. The transaction ID is designated by the producer when sending a message, usually as a string.
Producer ID: Each producer will be allocated a unique Producer ID when connecting to Kafka. This ID is used to identify the producer's messages and ensure the order and idempotence of messages.
Note:
Warning: High-frequency initialization of a large number of transactional Producer IDs may cause memory overflow and result in server overload, impacting stability.

Transaction Status Management

Kafka uses an internal topic called transaction status log to manage the state of transactions. This log records the status of each transaction (such as in progress, submitted, suspended) and messages related to the transaction. Transaction status log management involves the following aspects:
In-memory data structure: Kafka maintains a data structure (such as a hash table or map) in memory, used to store current active transaction information. This information includes transaction ID, Producer ID, transaction status, timestamp, etc.
Persistent storage: The transaction status log will be persisted to disk to ensure transaction status can be restored when the Kafka server restarts or recovers from a failure.

Memory Usage of Transaction Information

Memory usage of transaction information mainly depends on two factors:
Number of active transactions: The number of currently ongoing transactions directly affects memory usage. Each active transaction occupies a certain space in memory.
Transaction metadata: The metadata of each transaction (such as transaction ID, Producer ID, status) also consumes memory. The specific memory consumed depends on the data size of this metadata.

Transaction Cleanup

To prevent extremely high memory usage, Kafka periodically checks and cleans up completed transactions based on the configured expiry date, with a default reservation of 7 days before deletion upon expiry.

Common Transactional FullGC/OOM Issues

From transaction management, you can see that transaction information can occupy a large amount of memory. Among them, the two most direct factors impacting the amount of memory used are: the number of transaction IDs and the number of Producer IDs.
The number of transaction IDs refers to the number of transactions initialized and committed by clients to the Broker, which is closely related to the commit frequency of newly-added transactions by clients.
Producer ID refers to the Producer status information stored in each Topic partition of the Broker, so the number of Producer IDs is closely related to the number of partitions in the Broker.
In transaction scenarios, transaction IDs and Producer IDs are strongly bound. If the same Producer ID bound to a transaction ID sends messages to all partitions in the Broker, then the maximum number of Producer IDs in a Broker can theoretically reach the product of the number of transaction IDs and the number of partitions in the Broker. Assuming the number of transaction IDs in an instance is t and the number of partitions in a Broker is p, the maximum number of Producer IDs can reach t * p.
Note:
Therefore, if the number of transaction IDs in a Broker is t, the average memory occupancy size per transaction is tb, the number of partitions in a Broker is p, and the average size occupied by a Producer ID is pb, then the memory size occupied by transaction information in the Broker is: t * tb + t * p * pb.
You can see two scenarios that may lead to a surge in memory usage.
Clients frequently submit new transaction IDs during instance initialization.
A single transaction ID sending data to multiple partitions can cause the Producer ID cross-product count to soar super fast, potentially filling up memory very easily.
Note:
Therefore, for both the Flink client and self-implemented transactional producers, try to avoid these two scenarios. For example, for Flink, you can reduce the checkpoint frequency to lower how often the transaction ID changes due to the computation of the prefix and random string. Additionally, try to ensure data with the same transaction ID is sent to the same partition.

Flink Transaction Usage Notes

Flink provides the following optimization methods to ensure transaction information does not bloat rapidly:
Client optimization parameters: Flink increases the checkpoint interval (for more information, see community ISSUE).
Flink production task can optimize sink.partitioner to Fixed mode.
Flink parameter description: Flink-Apache Kafka SQL connector.


Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback