We are going to write the simplest possible Python program to process data with Apache Storm. We will take this quick start example from Apache Storm and write another version of that. The goal is that our explanation here is simpler to understand than the Apache Storm one. Or you can use this one to help understand the other one.
We are going to use the StreamParser Python API to write a program to create a spout and bolt to process streaming data. Read our Storm overview to get an understanding of the basic Storm concepts.
Note: Storm is designed to work with streaming data. But we do not have any streaming data as input yet, so we will just create some random values and a continuous loop and process that. The whole concept of streaming data needs further explanation, so we will example how to put Apache Kafka in front of Storm in another post.
Our program is composed of two simple parts. These two parts are the minimum you need to use Storm:
spout:read the temperature in Celsius from a few cities.
bolt: output that and add the temperature in Fahrenheit as another value in the tuple.
We tie the spout and bolt together as a topology, meaning explicitly define which Python class is out spout and which is out bolt.
Then we hand over all of this to StreamParse which will submit the topology to Zookeeper, Nimbus, and Supervisor without us having to worry about any of the details of that.
We will use the same directory structure for our sample here, as the StreamParse API requires that.
As you can see, words.py is the spout and wordcount.py is the bolt. Wordcount.clj defines the topology, meaning it says which spout and which bolt to run. Wordcount.clj is written in the Clojure programming language, which the StreamParse project says is easier to use that installing a bunch of software that you would need to define the topology using Python.
Now, let’s go through the pieces.
It is pretty easy to follow the wordcount/topologies/wordcount.clj example to make your own topology file. Below we highlight some items.
spouts.weatherSpout.WeatherSpout is the directory, folder, and Python class name.
Weather,weather-spout, and weather-boltcount-bolt names do not matter. They can be anything.
[“city” “temp” ] and [“city” “tempC” “tempF”] are lists of tuples. Note: the first time we worked through this example we make the mistake of using tuples and not lists and got not implemented error messages that were hard to track. The API documentation spells out the argument types for the functions used in the Python code below.
(defn weather [options]
;; spout configuration
[“city” “temp” ]
;; bolt configuration
[“city” “tempC” “tempF”]
The important thing to note here is that after the initialize method, Storm will call the next_tuple method forever, or until you control-C or otherwise cancel the program. That is what makes this suitable for working with streaming data, meaning data that never stops coming in.
You use emit to add the list of tuples to Storm which the bolt will then consume:
from streamparse.spout import Spout
from random import randint
def initialize(self, stormconf, context):
i = randint(0,3)
citytemp = self.cities[i]
city = citytemp
temp = citytemp
self.log(‘SPOUT %s: %s’ % (city, temp))
This takes the list of tuples created by the weatherSpout.py spout and outputs another list of tuples. So it emulates a continuously running process, like reading streaming data, such a Tweets, and then running analytics on that or handing it off to some kind of storage, such as Hadoop.
from streamparse.bolt import Bolt
def process(self, tup):
city = tup.values
tempC = tup.values
tempF = (int(tempC) * 1.8) + 32
self.emit([city, tempC, tempF])
self.log(‘BOLT city %s: C=%s F=%s’ % (city, tempC, tempF))
Now, When you run:
sparse run -n weather
You should see this:
So that is a basic example of how to use StreamParse to write an Apache Storm spout, bolt, and topology. You could, of course, use Java, or you can use Ruby or other languages if you investigate APIs for those.
If you get anything wrong, the stack trace it generates is helpful in solving most errors. Other than that just go back and compare your code to this code and the StreamParse to see where you have made a mistake.
Everything you need to know about outsourcing technology developmentAccess a special Introduction Package with everything you want to know about outsourcing your technology development. How should you evaluate a partner? What components of your solution that are suitable to be handed off to a partner? These answers and more below.