Ingesting Data from Kafka in Real Time

Last updated: 2020-05-28 16:41:32

    This document describes how to consume Kafka data in real time by using the Apache Druid Kafka indexing service. Before performing operations described in this document, just like in a Hadoop cluster, you need to make sure that the Kafka and Druid clusters can properly communicate with each other.

    • The two clusters should be in the same VPC. If they are in different VPCs, the two VPCs should be able to communicate with each other (through CCN or Peering Connection, for example).
    • If necessary, you should configure the Druid cluster with the host information of the Kafka cluster.

    Using Command Line

    1. Start the Kafka broker in the Kafka cluster.
      ./bin/kafka-server-start.sh config/server.properties
    2. Create a Kafka topic named mytopic.
      ./bin/kafka-topics.sh --create --zookeeper {kafka_zk_ip}:2181 --replication-factor 1 --partitions 1 --topic mytopic
      Output:
      Created topic "mytopic".
      {kafka_zk_ip}:2181 is the ZooKeeper address of the Kafka cluster.
    3. Prepare a data description file kafka-mytopic.json in the Druid cluster.
      {
        "type": "kafka",
        "dataSchema": {
            "dataSource": "mytopic-kafka",
            "parser": {
                "type": "string",
                "parseSpec": {
                    "timestampSpec": {
                        "column": "time",
                        "format": "auto"
                    },
                    "dimensionsSpec": {
                        "dimensions": ["url", "user"]
                    },
                    "format": "json"
                }
            },
            "granularitySpec": {
                "type": "uniform",
                "segmentGranularity": "hour",
                "queryGranularity": "none"
            },
            "metricsSpec": [{
                    "type": "count",
                    "name": "views"
                },
                {
                    "name": "latencyMs",
                    "type": "doubleSum",
                    "fieldName": "latencyMs"
                }
            ]
        },
        "ioConfig": {
            "topic": "mytopic",
            "consumerProperties": {
                "bootstrap.servers": "{kafka_ip}:9092",
                "group.id": "kafka-indexing-service"
            },
            "taskCount": 1,
            "replicas": 1,
            "taskDuration": "PT1H"
        },
        "tuningConfig": {
            "type": "kafka",
            "maxRowsInMemory": "100000"
        }
      }
      {kafka_ip}:9092 is the IP and port of the bootstrap.servers in your Kafka cluster.
    4. Add the Kafka supervisor on the master nodes in the Druid cluster.
      curl -XPOST -H 'Content-Type: application/json' -d @kafka-mytopic.json http://{druid_master_ip}:8090/druid/indexer/v1/supervisor
      Output:
      {"id":"mytopic-kafka"}
      {druid_master_ip}:8090 is the node where the overload process is deployed, which is generally a master node.
    5. Enable a console producer in the Kafka cluster.
      ./bin/kafka-console-producer.sh --broker-list {kafka_ip}:9092 --topic mytopic
      {kafka_ip}:9092 is the IP and port of the bootstrap.servers in your Kafka cluster.
    6. Prepare a query file named query-mytopic.json in the Druid cluster.
      {
        "queryType" : "search",
        "dataSource" : "mytopic-kafka",
        "intervals" : ["2020-03-13T00:00:00.000/2020-03-20T00:00:00.000"],
        "granularity" : "all",
        "searchDimensions": [
            "url",
            "user"
        ],
        "query": {
            "type": "insensitive_contains",
            "value": "roni"
        }
      }
    7. Enter some data on Kafka in real time.
      {"time": "2020-03-19T09:57:58Z", "url": "/foo/bar", "user": "brozo", "latencyMs": 62}
      {"time": "2020-03-19T16:57:59Z", "url": "/", "user": "roni", "latencyMs": 15}
      {"time": "2020-03-19T17:50:00Z", "url": "/foo/bar", "user": "roni", "latencyMs": 25}
      Timestamp generation command:
      python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
    8. Perform a query in the Druid cluster.
      curl -XPOST -H 'Content-Type: application/json' -d @query-mytopic.json http://{druid_ip}:8082/druid/v2/?pretty
      {druid_ip}:8082 is the broker node of your Druid cluster, which generally resides on a master or router node.

    Query result:

    [ {
      "timestamp" : "2020-03-19T16:00:00.000Z",
      "result" : [ {
        "dimension" : "user",
        "value" : "roni",
        "count" : 2
      } ]
    } ]

    Using Web-Based Visualization

    You can ingest data from a Kafka cluster and perform queries in the Druid Web UI Console in a visualized manner. For detailed directions, please see Loading data with the data loader.

    Was this page helpful?

    Was this page helpful?

    • Not at all
    • Not very helpful
    • Somewhat helpful
    • Very helpful
    • Extremely helpful
    Send Feedback
    Help