Connecting Storm to CKafka

Last updated: 2021-08-10 15:51:04

    Storm is a distributed real-time computing framework that can perform stream-based data processing and provide universal distributed RPC calling so as to reduce the delay of event processing down to sub-seconds. It is suitable for real-time data processing scenarios where low delay is required.

    How Storm Works

    There are two types of nodes in a Storm cluster: master node and worker node. The Nimbus process runs on the master node for resource allocation and status monitoring, and the Supervisor process runs on the worker node for listening on work tasks and starting the executor. The entire Storm cluster relies on ZooKeeper for common data storage, cluster status listening, task assignment, etc.

    A data processing program submitted to Storm is called a topology. The minimum message unit it processes is tuple (an array of arbitrary objects). A topology consists of spout and bolt, where spout is the source of tuple, while bolt can subscribe to any tuple issued by spout or bolt for processing.

    Storm with CKafka

    Storm can use CKafka as a spout to consume data for processing or as a bolt to store the processed data for consumption by other components.

    Testing environment

    CentOS 6.8

    Package Version
    Maven 3.5.0
    Storm 2.1.0
    SSH 5.3
    Java 1.8

    Prerequisites

    Directions

    Step 1. Get the CKafka instance access address

    1. Log in to the CKafka console.
    2. Select Instance List on the left sidebar and click the ID of an instance to enter the instance basic information page.
    3. On the instance basic information page, get the instance access address in the Access Mode module.

    Step 2. Create a topic

    1. On the instance basic information page, select the Topic Management tab on the top.
    2. On the topic management page, click Create to create a topic.

    Step 3. Add Maven dependencies

    Configure pom.xml as follows:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <groupid>storm</groupid>
    <artifactid>storm</artifactid>
    <version>0.0.1-SNAPSHOT</version>
    <name>storm</name>
    <properties>
    <project.build.sourceencoding>UTF-8</project.build.sourceencoding>
    </properties>
    <dependencies>
    <dependency>
    <groupid>org.apache.storm</groupid>
    <artifactid>storm-core</artifactid>
    <version>2.1.0</version>
    </dependency>
    <dependency>
    <groupid>org.apache.storm</groupid>
    <artifactid>storm-kafka-client</artifactid>
    <version>2.1.0</version>
    </dependency>
    <dependency>
    <groupid>org.apache.kafka</groupid>
    <artifactid>kafka_2.11</artifactid>
    <version>0.10.2.1</version>
    <exclusions>
    <exclusion>
    <groupid>org.slf4j</groupid>
    <artifactid>slf4j-log4j12</artifactid>
    </exclusion>
    </exclusions>
    </dependency>
    <dependency>
    <groupid>junit</groupid>
    <artifactid>junit</artifactid>
    <version>4.12</version>
    <scope>test</scope>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <plugin>
    <artifactid>maven-assembly-plugin</artifactid>
    <configuration>
    <descriptorrefs>
    <descriptorref>jar-with-dependencies</descriptorref>
    </descriptorrefs>
    <archive>
    <manifest>
    <mainclass>ExclamationTopology</mainclass>
    </manifest>
    </archive>
    </configuration>
    <executions>
    <execution>
    <id>make-assembly</id>
    <phase>package</phase>
    <goals>
    <goal>single</goal>
    </goals>
    </execution>
    </executions>
    </plugin>
    <plugin>
    <groupid>org.apache.maven.plugins</groupid>
    <artifactid>maven-compiler-plugin</artifactid>
    <configuration>
    <source>1.8
    <target>1.8</target>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </project>

    Step 4. Produce a message

    Using spout/bolt

    Topology code:

    //TopologyKafkaProducerSpout.java
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.bolt.KafkaBolt;
    import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
    import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;

    import java.util.Properties;

    public class TopologyKafkaProducerSpout {
    // `ip:port` of the CKafka instance applied for
    private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
    // Specify the topic to which to write messages
    private final static String TOPIC = "storm_test";
    public static void main(String[] args) throws Exception
    {
    // Set producer attributes
    // For functions, please visit https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
    // For attributes, please visit http://kafka.apache.org/0102/documentation.html
    Properties properties = new Properties();
    properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
    properties.put("acks", "1");
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    // Create a bolt to be written to Kafka. `fields("key" "message")` is used as the key and message for the produced message by default, which can also be specified in `FieldNameBasedTupleToKafkaMapper()`
    KafkaBolt kafkaBolt = new KafkaBolt()
    .withProducerProperties(properties)
    .withTopicSelector(new DefaultTopicSelector(TOPIC))
    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
    TopologyBuilder builder = new TopologyBuilder();
    // A spout class that generates messages in sequence with the output field being `sentence`
    SerialSentenceSpout spout = new SerialSentenceSpout();
    AddMessageKeyBolt bolt = new AddMessageKeyBolt();
    builder.setSpout("kafka-spout", spout, 1);
    // Add the fields required to produce messages to CKafka for the tuple
    builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
    // Write to CKafka
    builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");

    Config config = new Config();
    if (args != null &amp;&amp; args.length &gt; 0)
    {
    // Cluster mode, which is used to package a jar file and run it in Storm
    config.setNumWorkers(1);
    StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
    } else {
    // Local mode
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", config, builder.createTopology());
    Utils.sleep(10000);
    cluster.killTopology("test");
    cluster.shutdown();
    }

    }
    }

    Create a spout class that generates messages in sequence:

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;

    import java.util.Map;
    import java.util.UUID;

    public class SerialSentenceSpout extends BaseRichSpout {

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
    Utils.sleep(1000);
    // Produce a `UUID` string and send it to the next component
    spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("sentence"));
    }
    }

    Add key and message fields to the tuple. If key is null, the produced messages will be evenly allocated to each partition. If a key is specified, the messages will be hashed to specific partitions based on the key value:

    //AddMessageKeyBolt.java
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;

    public class AddMessageKeyBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
    // Take out the first field value
    String messae = tuple.getString(0);
    // System.out.println(messae);
    // Send to the next component
    basicOutputCollector.emit(new Values(null, messae));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    // Create a schema to send to the next component
    outputFieldsDeclarer.declare(new Fields("key", "message"));
    }
    }

    Using trident

    Use the trident class to generate a topology

    //TopologyKafkaProducerTrident.java
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
    import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
    import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
    import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;

    import java.util.Properties;

    public class TopologyKafkaProducerTrident {
    // `ip:port` of the CKafka instance applied for
    private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
    // Specify the topic to which to write messages
    private final static String TOPIC = "storm_test";
    public static void main(String[] args) throws Exception
    {
    // Set producer attributes
    // For functions, please visit https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
    // For attributes, please visit http://kafka.apache.org/0102/documentation.html
    Properties properties = new Properties();
    properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
    properties.put("acks", "1");
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // Set the trident
    TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
    .withProducerProperties(properties)
    .withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
    // Set to use `fields("key", "value")` as the written message, which doesn't have a default value as `FieldNameBasedTupleToKafkaMapper` does
    .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
    TridentTopology builder = new TridentTopology();
    // A spout that generates messages in batches with the output field being `sentence`
    builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
    .each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
    .partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());

    Config config = new Config();
    if (args != null &amp;&amp; args.length &gt; 0)
    {
    // Cluster mode, which is used to package a jar file and run it in Storm
    config.setNumWorkers(1);
    StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
    } else {
    // Local mode
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", config, builder.build());
    Utils.sleep(10000);
    cluster.killTopology("test");
    cluster.shutdown();
    }

    }

    private static class AddMessageKey extends BaseFunction {

    @Override
    public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
    // Take out the first field value
    String messae = tridentTuple.getString(0);
    //System.out.println(messae);
    // Send to the next component
    //tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
    tridentCollector.emit(new Values(null, messae));
    }
    }
    }

    Create a spout class that generates messages in batches:

    //TridentSerialSentenceSpout.java
    import org.apache.storm.Config;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.spout.IBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;

    import java.util.Map;
    import java.util.UUID;

    public class TridentSerialSentenceSpout implements IBatchSpout {

    private final int batchCount;

    public TridentSerialSentenceSpout(int batchCount)
    {
    this.batchCount = batchCount;
    }

    @Override
    public void open(Map map, TopologyContext topologyContext) {

    }

    @Override
    public void emitBatch(long l, TridentCollector tridentCollector) {
    Utils.sleep(1000);
    for(int i = 0; i &lt; batchCount; i++)
    {
    tridentCollector.emit(new Values(UUID.randomUUID().toString()));
    }
    }

    @Override
    public void ack(long l) {

    }

    @Override
    public void close() {

    }

    @Override
    public Map<string, object=""> getComponentConfiguration() {
    Config conf = new Config();
    conf.setMaxTaskParallelism(1);
    return conf;
    }

    @Override
    public Fields getOutputFields() {
    return new Fields("sentence")
    ;
    }
    }

    Step 5. Consume the message

    Using spout/bolt

    //TopologyKafkaConsumerSpout.java
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.spout.*;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;

    import java.util.HashMap;
    import java.util.Map;

    import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;

    public class TopologyKafkaConsumerSpout {
    // `ip:port` of the CKafka instance applied for
    private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
    // Specify the topic to which to write messages
    private final static String TOPIC = "storm_test";

    public static void main(String[] args) throws Exception
    {
    // Set a retry policy
    KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
    KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
    KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
    Integer.MAX_VALUE,
    KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
    );
    ByTopicRecordTranslator<string, string=""> trans = new ByTopicRecordTranslator&lt;&gt;(
    (r) -&gt; new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
    new Fields("topic", "partition", "offset", "key", "value"))
    ;
    // Set consumer parameters
    // For functions, please visit http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
    // For parameters, please visit http://kafka.apache.org/0102/documentation.html
    KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
    .setProp(new HashMap<string, object="">(){{
    put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1")
    ; // Set the group
    put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); // Set the session timeout period
    put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); // Set the request timeout period
    }})
    .setOffsetCommitPeriodMs(10_000) // Set the automatic confirmation period
    .setFirstPollOffsetStrategy(LATEST) // Set to pull the latest message
    .setRetry(kafkaSpoutRetryService)
    .setRecordTranslator(trans)
    .build();

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
    builder.setBolt("bolt", new BaseRichBolt(){
    private OutputCollector outputCollector;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    this.outputCollector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
    System.out.println(tuple.getStringByField("value"));
    outputCollector.ack(tuple);
    }
    }, 1).shuffleGrouping("kafka-spout");

    Config config = new Config();
    config.setMaxSpoutPending(20);
    if (args != null &amp;&amp; args.length &gt; 0)
    {
    config.setNumWorkers(3);
    StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
    }
    else {
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", config, builder.createTopology());
    Utils.sleep(20000);
    cluster.killTopology("test");
    cluster.shutdown();
    }
    }
    }

    Using trident

    //TopologyKafkaConsumerTrident.java
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
    import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
    import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;

    import java.util.HashMap;

    import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;


    public class TopologyKafkaConsumerTrident {
    // `ip:port` of the CKafka instance applied for
    private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
    // Specify the topic to which to write messages
    private final static String TOPIC = "storm_test";

    public static void main(String[] args) throws Exception
    {
    ByTopicRecordTranslator<string, string=""> trans = new ByTopicRecordTranslator&lt;&gt;(
    (r) -&gt; new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
    new Fields("topic", "partition", "offset", "key", "value"))
    ;
    // Set consumer parameters
    // For functions, please visit http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
    // For parameters, please visit http://kafka.apache.org/0102/documentation.html
    KafkaTridentSpoutConfig spoutConfig = KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
    .setProp(new HashMap<string, object="">(){{
    put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1")
    ; // Set the group
    put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Set automatic confirmation
    put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); // Set the session timeout period
    put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); // Set the request timeout period
    }})
    .setFirstPollOffsetStrategy(LATEST) // Set to pull the latest message
    .setRecordTranslator(trans)
    .build();

    TridentTopology builder = new TridentTopology();
    // Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); // Transaction type
    Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
    spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
    @Override
    public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
    System.out.println(tridentTuple.getStringByField("value"));
    tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
    }
    }, new Fields("message"));

    Config conf = new Config();
    conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
    if (args != null &amp;&amp; args.length &gt; 0)
    {
    conf.setNumWorkers(3);
    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
    }
    else {
    StormTopology stormTopology = builder.build();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", conf, stormTopology);
    Utils.sleep(10000);
    cluster.killTopology("test");
    cluster.shutdown();stormTopology.clear();
    }
    }
    }

    Step 6. Submit Storm

    After being compiled with mvn package, Storm can be submitted to the local cluster for debugging or submitted to the production cluster for running.

    storm jar your_jar_name.jar topology_name
    
    storm jar your_jar_name.jar topology_name tast_name