Help & DocumentationMessage Queue CKafkaQuick StartCompatibility with Open Source Kafka

Compatibility with Open Source Kafka

Last updated: 2020-02-26 17:44:03

PDF

CKafka is compatible with the production / consumption interfaces of series 0.9 and series 0.10. If you connect to a lower version (such as version 0.8) of Client's Kafka, you need to modify the interface accordingly. This article will compare the 0.8 version Kafka and the higher version Kafka, from the production side and the consumer side and provide the transformation method.

Kafka Producer

Overview

In Kafka 0.8.1, the Producer API is overwritten. This version of client is officially recommended for its better performance and more features. The community will maintain a new version of the Producer API.

Comparison of new and old versions of Producer API

  • New version of Producer API Demo
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(0), Integer.toString(0)));
producer.close();
  • Old version of Producer API Demo
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
producer.close();

It can be seen that the use of the new and old versions is basically the same, only the configuration of some parameters is different, the transformation cost is not great.

Compatibility Description
For Kafka, the 0.8.x version of Producer API can be successfully connected to Ckafka, without modification. The new version of Kafka Producer API is recommended.

Kafka Consumer

Overview

The two consumer API, available in the open source Apache Kafka version 0.8 are:

  • High Level Consumer API (shielding configuration details)
  • Simple Consumer API (configuration details support specification)

Version 0.9.x of Kafka introduces New Consumer, which combines the features of Consumer API (version 0.8 of Old Consumer (), which lightens the load on ZooKeeper.
Therefore, the following shows how to convert version 0.8 Consumer to version 0.9 New Consumer.

Comparison of new and old versions of Consumer API

Version 0.8 Consumer API

DEMO
The High-Level API can meet the general consumption requirements if only data is needed and you don't need to consider the processing of message offset. The High-Level Consumer API, built on the logic of Consumer Group, blocks the Offset management and features the capability of Broker exception handling and Consumer load balancing, allowing developers to get started with the Consumer client quickly.
The following need to be considered when you use the High Level Consumer:
(1) If the number of consumer threads is greater than the number of Partitions, some consumer threads cannot get the data.
(2) If the number of Partitions is greater than the number of threads, some threads consume more than one Partitions.
(3) The changes in Partitions and consumers can affect the Rebalance.

DEMO
Low Level Consumer API is recommended if you care about the message offset and need such features as repeated consumption or skip read, or want to specify certain partitions for consumption and ensure more consumption semantics. But in this case, you need to handle the exceptions of Offset and Broker by yourself.
The following need to be considered when you use the Low Level Consumer:
(1) You need to track and maintain the Offset and control the consumption progress by yourself.
2) You need to find the Leader of Partitions for the Topic, and deal with the Partition changes.

Version 0.9 New Consumer API

Kafka 0.9.x has introduced the New Consumer, which incorporates the features of Old Consumer while providing consumer coordination (advanced API) and lower-level access to build the custom consumption policies. The New Consumer also simplifies the consumer client and introduces a central Coordinator to solve the Herd Effect and Split Brain problems resulting from the separate connections to Zookeeper, and to reduce the load on Zookeeper.

Advantages:

  • Coordinator introduction
    The current version of High Level Consumer has the Herd Effect and Split Brain problems .Putting the logics of failure detection and Rebalance into a highly available central Coordinator can solve both of the problems, while greatly reducing the load on the Zookeeper.
    (2) Allow you to assign partitions by yourself.
    In order to keep some statuses of each local Partition the same, you need to maintain the mapping of Partitions. Some other scenarios are designed to associate the Consumer with the region-dependent Broker.
    (3) Allow you to manage Offset
    You can manage the Offset as needed and implement repeated consumption, skipped consumption and other semantics.
    (4) Trigger user-specified callbacks after Rebalance
    (v) Non-blocking Consumer API

Comparison of functions between new and old versions of Consumer API

Category Introduced Version AutoSave of Offset Self-management of Offset Automatic Exception Handling Automatic Rebalance Processing Auto Search of Leader Disadvantages
High Level Consumer Before 0.9 Yes No Yes Yes Yes Herd Effect and Split Brain
Simple Consumer Before 0.9 No Yes No No No Multiple exceptions need to be handled
New Consumer After 0.9 Yes Yes Yes Yes Yes Mature. The current version is recommended

Old Consumer is transformed to New Consumer

  1. New Consumer
//The main change in config is that the Zookeeper parameter is replaced
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Compared to old Consumer, the new Consumer makes it easier to create consumers 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
       System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
  1. Old Consumer (High Level)
//  Zookeeper is needed
Properties props = new Properties();
props.put("zookeeper.connect", "localhsot:2181");
props.put("group.id", "test");
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
ConsumerConfig config = new ConsumerConfig(props);
// Creation of connector is needed 
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
// Create message stream
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("foo", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams =
 connector.createMessageStreams(topicCountMap);
// Get data 
KafkaStream<byte[], byte[]> stream = streams.get("foo").get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
MessageAndMetadata<byte[], byte[]> msg = null;
while (iterator.hasNext()) {
     msg = iterator.next();
     System.out.println(//
            " group " + props.get("group.id") + //
            ", partition " + msg.partition() + ", " + //
             new String(msg.message()));
}}

As shown above, transforming into New Consumer simplifies the writing. The main change is the input of Zookeeper parameters is replaced by the input of Kafka address. In addition, the configuration of parameters interacting with Coordinator are added. Generally, it is sufficient to use the default configuration.

Both Ckafka and the new version of Kafka in the community support the New Consumer, which blocks the interaction between the Consumer client and Zookeeper (Zookeeper is not exposed to users any longer). The New Consumer solves the Herd Effect and Split Brain problems resulting from the direct interaction with Zookeeper, and integrates the features of Old Consumer, thus making the consumption more reliable.