Storm Connection to CKafka

Last updated: 2020-04-27 18:05:54

    Storm Overview

    Storm is a distributed real-time computing framework that can perform stream-based data processing and provide universal distributed RPC calling so as to reduce the delay of event processing down to sub-seconds. It is suitable for real-time data processing scenarios where low delay is required.

    How Storm Works

    There are two types of nodes in a Storm cluster: master node and worker node. The Nimbus process runs on the master node for resource allocation and status monitoring, and the Supervisor process runs on the worker node for listening on work tasks and starting the executor. The entire Storm cluster relies on ZooKeeper for common data storage, cluster status listening, task assignment, etc.

    A data processing program submitted to Storm is called a topology. The minimum message unit it processes is tuple (an array of arbitrary objects). A topology consists of spout and bolt, where spout is the source of tuple, while bolt can subscribe to any tuple issued by spout or bolt for processing.

    Storm with CKafka

    Storm can use CKafka as a spout to consume data for processing or as a bolt to store the processed data for consumption by other components.

    Testing environment

    Centos 6.8 OS

    Package Version
    Maven 3.5.0
    Storm 2.1.0
    SSH 5.3
    Java 1.8

    Applying for and creating a CKafka instance

    Log in to the CKafka Console and create a CKafka instance as instructed in Creating Instances.

    Creating a topic

    Create a topic under the instance as instructed in Creating Topics.

    Maven dependency

    Configure the pom.xml as follows:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>storm</groupId>
      <artifactId>storm</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <name>storm</name> 
         <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka-client</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.10.2.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass>ExclamationTopology</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

    Writing to CKafka

    Using spout/bolt

    Topology code:

    //TopologyKafkaProducerSpout.java
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.bolt.KafkaBolt;
    import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
    import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    import java.util.Properties;
    
    public class TopologyKafkaProducerSpout {
        // `ip:port` of the CKafka instance applied for
        private final static String BOOTSTRAP_SERVERS = "111.230.216.45:9092";
        // Specify the topic to which to write messages
        private final static String TOPIC = "storm-topology-test";
        public static void main(String[] args) throws Exception {
            // Set producer attributes
            // For functions, please visit https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
            // For attributes, please visit http://kafka.apache.org/0102/documentation.html
            Properties properties = new Properties();
            properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
            properties.put("acks", "1");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // Create a bolt to be written to Kafka. `fields("key" "message")` is used as the key and message for the produced message by default, which can also be specified in `FieldNameBasedTupleToKafkaMapper()`
            KafkaBolt kafkaBolt = new KafkaBolt()
                    .withProducerProperties(properties)
                    .withTopicSelector(new DefaultTopicSelector(TOPIC))
                    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
            TopologyBuilder builder = new TopologyBuilder();
            // A spout class that generates messages in sequence with the output field being `sentence`
            SerialSentenceSpout spout = new SerialSentenceSpout();
            AddMessageKeyBolt bolt = new AddMessageKeyBolt();
            builder.setSpout("kafka-spout", spout, 1);
            // Add the fields required to produce messages to CKafka for the tuple
            builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
            // Write to CKafka
            builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");
    
            Config config = new Config();
            if (args != null && args.length > 0) {
                // Cluster mode, which is used to package a jar file and run it in Storm
                config.setNumWorkers(1);
                StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
            } else {
                // Local mode
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", config, builder.createTopology());
                Utils.sleep(10000);
                cluster.killTopology("test");
                cluster.shutdown();
            }
    
        }
    }

    Create a spout class that generates messages in sequence:

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.Map;
    import java.util.UUID;
    
    public class SerialSentenceSpout extends BaseRichSpout {
    
        private SpoutOutputCollector spoutOutputCollector;
    
        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }
    
        @Override
        public void nextTuple() {
            Utils.sleep(1000);
            // Produce a `UUID` string and send it to the next component
            spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("sentence"));
        }
    }

    Add key and message fields to the tuple. If key is null, the produced messages will be evenly allocated to each partition. If a key is specified, the messages will be hashed to specific partitions based on the key value.

    //AddMessageKeyBolt.java
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class AddMessageKeyBolt extends BaseBasicBolt {
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            // Take out the first field value
            String messae = tuple.getString(0);
    //        System.out.println(messae);
            // Send to the next component
            basicOutputCollector.emit(new Values(null, messae));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            // Create a schema to send to the next component
            outputFieldsDeclarer.declare(new Fields("key", "message"));
        }
    }

    Using trident

    Use the trident class to generate a topology

    //TopologyKafkaProducerTrident.java
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
    import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
    import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
    import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.Properties;
    
    public class TopologyKafkaProducerTrident {
        // `ip:port` of the CKafka instance applied for
        private final static String BOOTSTRAP_SERVERS = "111.230.216.45:9092";
        // Specify the topic to which to write messages
        private final static String TOPIC = "storm-trident-test";
        public static void main(String[] args) throws Exception {
            // Set producer attributes
            // For functions, please visit https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
            // For attributes, please visit http://kafka.apache.org/0102/documentation.html
            Properties properties = new Properties();
            properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
            properties.put("acks", "1");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // Set the trident
            TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
                    .withProducerProperties(properties)
                    .withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
                    // Set to use `fields("key", "value")` as the written message, which doesn't have a default value as `FieldNameBasedTupleToKafkaMapper` does
                    .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
            TridentTopology builder = new TridentTopology();
            // A spout that generates messages in batches with the output field being `sentence`
            builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
                    .each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
                    .partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());
    
            Config config = new Config();
            if (args != null && args.length > 0) {
                // Cluster mode, which is used to package a jar file and run it in Storm
                config.setNumWorkers(1);
                StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
            } else {
                // Local mode
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", config, builder.build());
                Utils.sleep(10000);
                cluster.killTopology("test");
                cluster.shutdown();
            }
    
        }
    
        private static class AddMessageKey extends BaseFunction {
    
            @Override
            public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
                // Take out the first field value
                String messae = tridentTuple.getString(0);
                //System.out.println(messae);
                // Send to the next component
                //tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
                tridentCollector.emit(new Values(null, messae));
            }
        }
    }

    Create a spout class that generates messages in batches:

    //TridentSerialSentenceSpout.java
    import org.apache.storm.Config;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.spout.IBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.Map;
    import java.util.UUID;
    
    public class TridentSerialSentenceSpout implements IBatchSpout {
    
        private final int batchCount;
    
        public TridentSerialSentenceSpout(int batchCount) {
            this.batchCount = batchCount;
        }
    
        @Override
        public void open(Map map, TopologyContext topologyContext) {
    
        }
    
        @Override
        public void emitBatch(long l, TridentCollector tridentCollector) {
            Utils.sleep(1000);
            for(int i = 0; i < batchCount; i++){
                tridentCollector.emit(new Values(UUID.randomUUID().toString()));
            }
        }
    
        @Override
        public void ack(long l) {
    
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            Config conf = new Config();
            conf.setMaxTaskParallelism(1);
            return conf;
        }
    
        @Override
        public Fields getOutputFields() {
            return new Fields("sentence");
        }
    }

    Consumption from CKafka

    Using spout/bolt

    //TopologyKafkaConsumerSpout.java
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.spout.*;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
    
    public class TopologyKafkaConsumerSpout {
        // `ip:port` of the CKafka instance applied for
        private final static String BOOTSTRAP_SERVERS = "111.230.216.45:9092";
        // Specify the topic to which to write messages
        private final static String TOPIC = "storm-topology-test";
    
        public static void main(String[] args) throws Exception {
            // Set a retry policy
            KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
                    KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
                    KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                    Integer.MAX_VALUE,
                    KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
            );
            ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
                    (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
                    new Fields("topic", "partition", "offset", "key", "value"));
            // Set consumer parameters
            // For functions, please visit http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
            // For parameters, please visit http://kafka.apache.org/0102/documentation.html
            KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
                    .setProp(new HashMap<String, Object>(){{
                        put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); // Set the group
                        put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); // Set the session timeout period
                        put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); // Set the request timeout period
                    }})
                    .setOffsetCommitPeriodMs(10_000) // Set the automatic confirmation period
                    .setFirstPollOffsetStrategy(LATEST) // Set to pull the latest message
                    .setRetry(kafkaSpoutRetryService)
                    .setRecordTranslator(trans)
                    .build();
    
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
            builder.setBolt("bolt", new BaseRichBolt(){
                private OutputCollector outputCollector;
                @Override
                public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
                }
    
                @Override
                public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
                    this.outputCollector = outputCollector;
                }
    
                @Override
                public void execute(Tuple tuple) {
                    System.out.println(tuple.getStringByField("value"));
                    outputCollector.ack(tuple);
                }
            }, 1).shuffleGrouping("kafka-spout");
    
            Config config = new Config();
            config.setMaxSpoutPending(20);
            if (args != null && args.length > 0) {
                config.setNumWorkers(3);
                StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
            }
            else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", config, builder.createTopology());
                Utils.sleep(20000);
                cluster.killTopology("test");
                cluster.shutdown();
            }
        }
    }

    Using trident

    //TopologyKafkaConsumerTrident.java
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
    import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
    import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.HashMap;
    
    import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
    
    
    public class TopologyKafkaConsumerTrident {
        // `ip:port` of the CKafka instance applied for
        private final static String BOOTSTRAP_SERVERS = "111.230.216.45:9092";
        // Specify the topic to which to write messages
        private final static String TOPIC = "storm-trident-test";
    
        public static void main(String[] args) throws Exception {
            ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
                    (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
                    new Fields("topic", "partition", "offset", "key", "value"));
            // Set consumer parameters
            // For functions, please visit http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
            // For parameters, please visit http://kafka.apache.org/0102/documentation.html
            KafkaTridentSpoutConfig spoutConfig = KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
                    .setProp(new HashMap<String, Object>(){{
                        put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); // Set the group
                        put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Set automatic confirmation
                        put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); // Set the session timeout period
                        put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); // Set the request timeout period
                    }})
                    .setFirstPollOffsetStrategy(LATEST) // Set to pull the latest message
                    .setRecordTranslator(trans)
                    .build();
    
            TridentTopology builder = new TridentTopology();
    //      Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); // Transaction type
            Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
            spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
                @Override
                public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
                    System.out.println(tridentTuple.getStringByField("value"));
                    tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
                }
            }, new Fields("message"));
    
            Config conf = new Config();
            conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
            }
            else {
                StormTopology stormTopology = builder.build();
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", conf, stormTopology);
                Utils.sleep(10000);
                cluster.killTopology("test");
                cluster.shutdown();stormTopology.clear();
            }
        }
    }

    Submitting Storm

    After being compiled with mvn package, Storm can be submitted to the local cluster for debugging or submitted to the production cluster for running.

    storm jar your_jar_name.jar topology_name
    storm jar your_jar_name.jar topology_name tast_name

    Was this page helpful?

    Was this page helpful?

    • Not at all
    • Not very helpful
    • Somewhat helpful
    • Very helpful
    • Extremely helpful
    Send Feedback
    Help