Getting Started With Apache Storm
16 Dec 2016Apache Storm
Apache Storm is free and open source distributed realtime computation system. Storm provides reliable realtime data processing what Hadoop did for batch processing. It provides realtime, robust, user friendly, reliable data processing capability with operational Intelligence.
Use Cases of Storm
- Realtime Analytics
- Online Machine Learning
- Continuous Computation
- ETL
Apache Storm is a platform for analyzing realtime streams of data as they arrive, so you can react to data as it happens.
Storm Cluster Architecture
-
Storm Cluster consists of Nimbus (master node) and Supervisor (worker node).
-
Nimbus is the central component of Apache Storm, which analyzes the topology and gathers the tasks to be executed and distributes the work to it’s worker nodes.
-
Supervisor nodes (worker node) follows instructions provided by Nimbus. A supervisor has multiple worker processes.
-
Worker processes will execute the task related to specific topology. Worker process spins off executor to perform a task. A worker process will have multiple executors to perform the task. A task performs the actual data processing.
Storm Data Model
- Topology - Directed Acyclic Graph, Vertices = Computation and Edges = Stream of Data
// create the topology
TopologyBuilder builder = new TopologyBuilder();
- Spouts - Source of Data for the Topology. Postgres/MySQL/Kafka/Kestrel
builder.setSpout("word", new TestWordSpout(), 10);
- Bolts - Units of Computation of Data. Eg. - Aggregation/Filtering/Join/Transformation
// attach the exclamation bolt to the topology - parallelism of 3
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
// attach another exclamation bolt to the topology - parallelism of 2
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
-
Tuple - Unit of Data entering the stream. Immutable ordered list of elements
-
Stream - Unordered sequence of tuples
Adding Storm to your Project
build.gradle
dependencies {
compile "org.apache.storm:storm-core:1.0.2"
}
Sample Code
In below example, we are using TestWordSpout
which spits out random String, which is being consumed by PrintBolt
bolt which prints out to User Screen. It is really simple topology but give you gist of basic concept how processing is done using Apache Storm.
HelloWorldTopology.java
package com.aayushtuladhar.storm.topology;
import com.aayushtuladhar.storm.bolt.PrintBolt;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloWorldTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//Creating Topology
TopologyBuilder builder = new TopologyBuilder();
//Attaching Spout
builder.setSpout("word", new TestWordSpout(), 10);
//Attaching Bolt
builder.setBolt("printWord", new PrintBolt(), 2).shuffleGrouping("word");
//Creating Default Config Object
Config conf = new Config();
if (args != null && args.length > 0) {
//Running in Live Cluster
conf.setNumWorkers(3);
conf.setDebug(false);
try {
log.info("Submitting " + args[0]);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AuthorizationException e) {
e.printStackTrace();
}
} else {
// create the local cluster instance
LocalCluster cluster = new LocalCluster();
conf.setDebug(true);
log.info("Submitting HelloWorld Topology in Local Cluster");
// submit the topology to the local cluster
cluster.submitTopology("HelloWorldTopology", conf, builder.createTopology());
Utils.sleep(1000 * 30);
// kill the topology
cluster.killTopology("HelloWorldTopology");
// we are done, so shutdown the local cluster
cluster.shutdown();
log.info("Topology Killed");
}
}
}
PrintBolt.java
package com.aayushtuladhar.storm.bolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PrintBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String name = input.getString(0);
System.out.println(name);
//Acknowledging Input
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}