Compatibility with Open Source Kafka

Last updated: 2020-07-08 14:30:55

    CKafka is compatible with the Producer and Consumer APIs of Apache Kafka 0.9 and 0.10. To connect to an on-premises Kafka of an earlier version (such as v0.8), partial rewriting of APIs is needed. This document compares the Producer and Consumer APIs of Kafka 0.8 and its earlier versions, and describes how to rewrite the APIs.

    Kafka Producer

    Overview

    In Kafka 0.8.1, the Producer API is rewritten. This Producer client version is officially recommended because it provides better performance and more features. The community will maintain the new version of the Producer API (referred to as the New Producer API).

    Comparison between new producer API and old producer API

    • New Producer API Demo
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:4242");
      props.put("acks", "all");
      props.put("retries",0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      Producer<String, String> producer = new KafkaProducer<>(props);
      producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(0), Integer.toString(0)));
      producer.close();
    • Old Producer API Demo
      Properties props = new Properties();
      props.put("metadata.broker.list", "broker1:9092");
      props.put("serializer.class", "kafka.serializer.StringEncoder");
      props.put("partitioner.class", "example.producer.SimplePartitioner");
      props.put("request.required.acks", "1");
      ProducerConfig config = new ProducerConfig(props);
      Producer<String, String> producer = new Producer<String, String>(config);
      KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
      producer.send(data);
      producer.close();

    As shown in the previous code, the basic usage of the new and old versions are the same, except for some parameter settings. This means the cost of API partial rewriting is not high.

    Compatibility description

    Producer API 0.8.x can be connected to CKafka successfully without partial rewriting. We recommend using the New Kafka Producer API.

    Kafka Consumer

    Overview

    The open-source Apache Kafka 0.8 provides two types of the Consumer API:

    • High Level Consumer API (blocking configuration details)
    • Simple Consumer API (support for parameter configuration adjustment)

    Kafka 0.9.x has introduced the New Consumer API that inherits the features of the two types of the old consumer API (v0.8) and reduces the load on ZooKeeper.
    The following describes how to transform the old consumer API (v0.8) to the new consumer API (v0.9).

    Comparison between new consumer API and old consumer API

    Old consumer API (v0.8)

    • High Level Consumer API (Demo)
      The High Level Consumer API can meet general requirements if you care only about data, except for message offset. This API, built on the consumer group logic, blocks the offset management, and supports Broker fault handling and Consumer load balancing. It allows developers to get started with the Consumer client quickly.
      Consider the following when you use the High Level Consumer API:
      • If the number of consumer threads is greater than the number of partitions, certain consumer threads cannot obtain data.
      • If the number of partitions is greater than the number of threads, certain threads consume more than one partition.
      • The changes in partitions and consumers will affect rebalancing.
    • Low Level Consumer API (Demo)
      The Low Level Consumer API is recommended if you need message offset and features like repeated consumption or skip read, or if you want to consume specific partitions and ensure more consumption semantics. But in this case, you need to handle offsets and Broker exceptions.
      When using the Low Level Consumer API, you need to:
      • Track and maintain the offset and control the consumption progress.
      • Find the leader of partitions for the topic, and deal with partition changes.

    New consumer API (v0.9)

    Kafka 0.9.x has introduced the New Consumer API that inherits the features of the two types of Old Consumer API while providing consumer coordination (High Level API) and lower-level access to customize consumption policies. The New Consumer also simplifies the consumer client and introduces a central coordinator to solve the herd effect and split-brain problems resulting from the separate connections to ZooKeeper, and to reduce the load on ZooKeeper.

    Advantages:

    • Introduces coordinators
      The current version of High Level Consumer has the herd effect and split-brain problems. Placing failure detection and rebalancing logics into a highly available central coordinator solves both problems while greatly reducing the load on the ZooKeeper.
    • Allows you to assign partitions
      To keep certain states of each local partition unchanged, you need to keep partition mappings unchanged. Some other scenarios are designed to associate the Consumer with the region-dependent Broker.
    • Allows you to manage offsets
      You can manage offsets as needed to implement repeated consumption, skipped consumption, and other semantics.
    • Triggers callbacks after rebalancing based on your specifications
    • Provides non-blocking Consumer API

    Comparison between new consumer API and old consumer API

    Category Introduced Version Automatic Save of Offset Self-management of Offset Automatic Exception Handling Automatic Rebalance Processing Automatic Search of Leader Advantage/Disadvantage
    High Level Consumer Before 0.9 Supported Not supported Supported Supported Supported Herd effect and split-brain problems need to be handled.
    Simple Consumer Before 0.9 Not supported Supported Not supported Not supported Not supported Multiple types of exception need to be handled.
    New Consumer After 0.9 Supported Supported Supported Supported Supported Mature. This is the recommended version.

    Transforming old consumer to new consumer

    • New Consumer

      //The main configuration difference is that the ZooKeeper parameters are replaced.
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      //Compared with old consumers, new consumers are easier to create. 
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("foo", "bar"));
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
           System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
    • Old Consumer (High Level)

      //Old consumers require ZooKeeper
      Properties props = new Properties();
      props.put("zookeeper.connect", "localhsot:2181");
      props.put("group.id", "test");
      props.put("auto.commit.enable", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("auto.offset.reset", "smallest");
      ConsumerConfig config = new ConsumerConfig(props);
      //Require connector creation 
      ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
      //Create a message stream
      Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
      topicCountMap.put("foo", 1);
      Map<String, List<KafkaStream<byte[], byte[]>>> streams =
      connector.createMessageStreams(topicCountMap);
      //Obtain data 
      KafkaStream<byte[], byte[]> stream = streams.get("foo").get(0);
      ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
      MessageAndMetadata<byte[], byte[]> msg = null;
      while (iterator.hasNext()) {
         msg = iterator.next();
         System.out.println(//
                " group " + props.get("group.id") + //
                ", partition " + msg.partition() + ", " + //
                 new String(msg.message()));
      }}

    Comparing with the old consumer, the new consumer has simpler coding and uses Kafka addresses instead of ZooKeeper parameters. In addition, the new consumer has parameter settings for interactions with coordinators, in which the default settings are suitable for general use.

    Compatibility description

    Both CKafka and the new version of Kafka in the open-source community support the rewritten new consumer API, which blocks the interaction between the consumer client and ZooKeeper (ZooKeeper is not exposed to users any longer). The new consumer solves the herd effect and split-brain problems resulting from direct interaction with ZooKeeper, and integrates the features of the old consumer, thus making the consumption more reliable.

    Was this page helpful?

    Was this page helpful?

    • Not at all
    • Not very helpful
    • Somewhat helpful
    • Very helpful
    • Extremely helpful
    Send Feedback
    Help