Getting Started With Apache Storm


Apache Storm

Apache Storm

Apache Storm is free and open source distributed realtime computation system. Storm provides reliable realtime data processing what Hadoop did for batch processing. It provides realtime, robust, user friendly, reliable data processing capability with operational Intelligence.

Use Cases of Storm

  • Realtime Analytics
  • Online Machine Learning
  • Continuous Computation
  • ETL

Apache Storm is a platform for analyzing realtime streams of data as they arrive, so you can react to data as it happens.


Storm Cluster Architecture

  • Storm Cluster consists of Nimbus (master node) and Supervisor (worker node).

  • Nimbus is the central component of Apache Storm, which analyzes the topology and gathers the tasks to be executed and distributes the work to it’s worker nodes.

  • Supervisor nodes (worker node) follows instructions provided by Nimbus. A supervisor has multiple worker processes.

  • Worker processes will execute the task related to specific topology. Worker process spins off executor to perform a task. A worker process will have multiple executors to perform the task. A task performs the actual data processing.


Storm Data Model

  • Topology - Directed Acyclic Graph, Vertices = Computation and Edges = Stream of Data
// create the topology
TopologyBuilder builder = new TopologyBuilder();
  • Spouts - Source of Data for the Topology. Postgres/MySQL/Kafka/Kestrel
builder.setSpout("word", new TestWordSpout(), 10);
  • Bolts - Units of Computation of Data. Eg. - Aggregation/Filtering/Join/Transformation
// attach the exclamation bolt to the topology - parallelism of 3
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");

// attach another exclamation bolt to the topology - parallelism of 2
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
  • Tuple - Unit of Data entering the stream. Immutable ordered list of elements

  • Stream - Unordered sequence of tuples


Adding Storm to your Project

build.gradle

dependencies {
    compile "org.apache.storm:storm-core:1.0.2"
  }

Sample Code

In below example, we are using TestWordSpout which spits out random String, which is being consumed by PrintBolt bolt which prints out to User Screen. It is really simple topology but give you gist of basic concept how processing is done using Apache Storm.

HelloWorldTopology.java

package com.aayushtuladhar.storm.topology;

import com.aayushtuladhar.storm.bolt.PrintBolt;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HelloWorldTopology {

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

        //Creating Topology
        TopologyBuilder builder = new TopologyBuilder();

        //Attaching Spout
        builder.setSpout("word", new TestWordSpout(), 10);

        //Attaching Bolt
        builder.setBolt("printWord", new PrintBolt(), 2).shuffleGrouping("word");

        //Creating Default Config Object
        Config conf = new Config();


        if (args != null && args.length > 0) {
            //Running in Live Cluster
            conf.setNumWorkers(3);
            conf.setDebug(false);
            try {
                log.info("Submitting " + args[0]);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }

        } else {
            // create the local cluster instance
            LocalCluster cluster = new LocalCluster();
            conf.setDebug(true);

            log.info("Submitting HelloWorld Topology in Local Cluster");
            // submit the topology to the local cluster
            cluster.submitTopology("HelloWorldTopology", conf, builder.createTopology());
            Utils.sleep(1000 * 30);

            // kill the topology
            cluster.killTopology("HelloWorldTopology");

            // we are done, so shutdown the local cluster
            cluster.shutdown();
            log.info("Topology Killed");
        }

    }

}

PrintBolt.java

package com.aayushtuladhar.storm.bolt;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.Map;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PrintBolt extends BaseRichBolt {

	private OutputCollector collector;

	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple input) {
		String name = input.getString(0);
		System.out.println(name);

		//Acknowledging Input
		collector.ack(input);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}

}