Getting Started With Apache Storm

Apache Storm

Apache Storm

Apache Storm is free and open source distributed realtime computation system. Storm provides reliable realtime data processing what Hadoop did for batch processing. It provides realtime, robust, user friendly, reliable data processing capability with operational Intelligence.

Use Cases of Storm

  • Realtime Analytics
  • Online Machine Learning
  • Continuous Computation
  • ETL

Apache Storm is a platform for analyzing realtime streams of data as they arrive, so you can react to data as it happens.

Storm Cluster Architecture

  • Storm Cluster consists of Nimbus (master node) and Supervisor (worker node).

  • Nimbus is the central component of Apache Storm, which analyzes the topology and gathers the tasks to be executed and distributes the work to it’s worker nodes.

  • Supervisor nodes (worker node) follows instructions provided by Nimbus. A supervisor has multiple worker processes.

  • Worker processes will execute the task related to specific topology. Worker process spins off executor to perform a task. A worker process will have multiple executors to perform the task. A task performs the actual data processing.

Storm Data Model

  • Topology - Directed Acyclic Graph, Vertices = Computation and Edges = Stream of Data
// create the topology
TopologyBuilder builder = new TopologyBuilder();
  • Spouts - Source of Data for the Topology. Postgres/MySQL/Kafka/Kestrel
builder.setSpout("word", new TestWordSpout(), 10);
  • Bolts - Units of Computation of Data. Eg. - Aggregation/Filtering/Join/Transformation
// attach the exclamation bolt to the topology - parallelism of 3
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");

// attach another exclamation bolt to the topology - parallelism of 2
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
  • Tuple - Unit of Data entering the stream. Immutable ordered list of elements

  • Stream - Unordered sequence of tuples

Adding Storm to your Project


dependencies {
    compile "org.apache.storm:storm-core:1.0.2"

Sample Code

In below example, we are using TestWordSpout which spits out random String, which is being consumed by PrintBolt bolt which prints out to User Screen. It is really simple topology but give you gist of basic concept how processing is done using Apache Storm.

package com.aayushtuladhar.storm.topology;

import com.aayushtuladhar.storm.bolt.PrintBolt;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

import lombok.extern.slf4j.Slf4j;

public class HelloWorldTopology {

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

        //Creating Topology
        TopologyBuilder builder = new TopologyBuilder();

        //Attaching Spout
        builder.setSpout("word", new TestWordSpout(), 10);

        //Attaching Bolt
        builder.setBolt("printWord", new PrintBolt(), 2).shuffleGrouping("word");

        //Creating Default Config Object
        Config conf = new Config();

        if (args != null && args.length > 0) {
            //Running in Live Cluster
            try {
      "Submitting " + args[0]);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AuthorizationException e) {

        } else {
            // create the local cluster instance
            LocalCluster cluster = new LocalCluster();

  "Submitting HelloWorld Topology in Local Cluster");
            // submit the topology to the local cluster
            cluster.submitTopology("HelloWorldTopology", conf, builder.createTopology());
            Utils.sleep(1000 * 30);

            // kill the topology

            // we are done, so shutdown the local cluster
  "Topology Killed");



package com.aayushtuladhar.storm.bolt;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.Map;

import lombok.extern.slf4j.Slf4j;

public class PrintBolt extends BaseRichBolt {

	private OutputCollector collector;

	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;

	public void execute(Tuple input) {
		String name = input.getString(0);

		//Acknowledging Input

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub