Apache Storm StreamParse Python

Play Voice
December 1, 2023
Apache Storm

We are going to write the simplest possible Python program to process data with Apache Storm.  We will take this quick start example from Apache Storm and write another version of that.  The goal is that our explanation here is simpler to understand than the Apache Storm one.  Or you can use this one to help understand the other one.

We are going to use the StreamParser Python API to write a program to create a spout and bolt to process streaming data. Read our Storm overview to get an understanding of the basic Storm concepts.

Note: Storm is designed to work with streaming data.  But we do not have any streaming data as input yet, so we will just create some random values and a continuous loop and process that.  The whole concept of streaming data needs further explanation, so we will example how to put Apache Kafka in front of Storm in another post.

Our program is composed of two simple parts.  These two parts are the minimum you need to use Storm:

spout:read the temperature in Celsius from a few cities.

bolt: output that and add the temperature in Fahrenheit as another value in the tuple.

We tie the spout and bolt together as a topology, meaning explicitly define which Python class is out spout and which is out bolt.

Then we hand over all of this to StreamParse which will submit the topology to Zookeeper, Nimbus, and Supervisor without us having to worry about any of the details of that.

[See Also: Improving UX Design by Using Simulation Tools like InVision and Flinto]

Requirements

  1. You need a working Apache Storm system. You can follow our overview to get an understanding of what it is then install Zookeeper and Storm following these instructions from Apache Storm. It is not necessary that you start Nimbus, Zookeeper, nor the Supervisor as the StreamParse API will do all that.
  2. Python and Pip
  3. Install the StreamParse API. Just type:
    pip install streamparse
  4. Install Leiningen.  Do not use apt-get install anything as that version (for Ubuntu) is too old. Instead copy this source file exactly and rename it “lein” and put it in your path.  Then run it.  After it is installed you can type this to test it:lein version
  5. Now, run these 3 commands to set up the WordCount example.You can go to these instructions to understand what that does.sparse quickstart wordcount cd wordcountsparse runSo, here is what quickstart wordcount did.  It created several files and the directory structure that you need. Here are the key files it created:wordcount/topologies/wordcount.clj wordcount/src/spouts/words.pywordcount/src/bolts/wordcount.py
  6. We will use the same directory structure for our sample here, as the StreamParse API requires that.
    As you can see, words.py is the spout and wordcount.py is the bolt.  Wordcount.clj defines the topology, meaning it says which spout and which bolt to run.  Wordcount.clj is written in the Clojure programming language, which the StreamParse project says is easier to use that installing a bunch of software that you would need to define the topology using Python.
  7. Now, let’s go through the pieces.
  8. Topology: topologies/weather.clj:
  9. It is pretty easy to follow the wordcount/topologies/wordcount.clj example to make your own topology file.  Below we highlight some items.
  10. spouts.weatherSpout.WeatherSpout is the directory, folder, and Python class name.
  11. Weather,weather-spout, and weather-boltcount-bolt names do not matter. They can be anything.
  12. [“city” “temp” ] and [“city” “tempC” “tempF”] are lists of tuples. Note: the first time we worked through this example we make the mistake of using tuples and not lists and got not implemented error messages that were hard to track. The API documentation spells out the argument types for the functions used in the Python code below.
  13. (ns weather
  14. (:use [streamparse.specs])
  15. (:gen-class))
  16. (defn weather [options]
  17. [
  18. ;; spout configuration
  19. {“weather-spout” (python-spout-spec
  20. options
  21. “spouts.weatherSpout.WeatherSpout”
  22. [“city” “temp” ]
  23. )
  24. }
  25. ;; bolt configuration
  26. {“weather-boltcount-bolt” (python-bolt-spec
  27. options
  28. {“weather-spout” :shuffle}
  29. “bolts.weatherBolt.WeatherBolt”
  30. [“city” “tempC” “tempF”]
  31. )
  32. }
  33. ]
  34. )
  35. Spout: src/spouts/weatherSpout.py:
  36. The important thing to note here is that after the initialize method, Storm will call the next_tuple method forever, or until you control-C or otherwise cancel the program. That is what makes this suitable for working with streaming data, meaning data that never stops coming in.
  37. You use emit to add the list of tuples to Storm which the bolt will then consume:
  38. import urllib2
  39. import json
  40. from streamparse.spout import Spout
  41. import itertools
  42. from random import randint
  43. class WeatherSpout(Spout):
  44. def initialize(self, stormconf, context):
  45. self.cities= (
  46. [“Berlin”, 21],
  47. [“New_York”, 22],
  48. [“Paris”, 23],
  49. [“Bogota”, 24],
  50. )
  51. def next_tuple(self):
  52. i = randint(0,3)
  53. citytemp = self.cities[i]
  54. city = citytemp[0]
  55. temp = citytemp[1]
  56. self.emit([city, temp])
  57. self.log(‘SPOUT %s: %s’ % (city, temp))
  58. [See Also: Leveraging Swagger to Manage and Harmonize Rest APIs]
  59. Bolt: src/bolts/weatherBolt.py:
  60. This takes the list of tuples created by the weatherSpout.py spout and outputs another list of tuples. So it emulates a continuously running process, like reading streaming data, such a Tweets, and then running analytics on that or handing it off to some kind of storage, such as Hadoop.
  61. import re
  62. from streamparse.bolt import Bolt
  63. class WeatherBolt(Bolt):
  64. def process(self, tup):
  65. city = tup.values[0]
  66. tempC = tup.values[1]
  67. tempF = (int(tempC) * 1.8) + 32
  68. self.emit([city, tempC, tempF])
  69. self.log(‘BOLT city %s: C=%s F=%s’ % (city, tempC, tempF))

Run it

  1. Now, When you run:
  2. sparse run -n weather
  3. You should see this

Conclusion

So that is a basic example of how to use StreamParse to write an Apache Storm spout, bolt, and topology.  You could, of course, use Java, or you can use Ruby or other languages if you investigate APIs for those.
If you get anything wrong, the stack trace it generates is helpful in solving most errors. Other than that just go back and compare your code to this code and the StreamParse to see where you have made a mistake.

Everything you need to know about outsourcing technology development

Access a special Introduction Package with everything you want to know about outsourcing your technology development. How should you evaluate a partner? What components of your solution that are suitable to be handed off to a partner? These answers and more below.

Have a specific concern bothering you?

Try our complimentary 2-week POV engagement
I have read and accept the Privacy Policy
Our Latest Blogs
AI in Quality Assurance: How AI is Transforming Future of Quality Assurance
Read More >
How Generative AI is Transforming Product Engineering?
Read More >
Security Compliance Management : Necessity of Compliance for Product Security
Read More >

About The Author

Harsh Raval

Speak to our Experts
Lets Talk

Our Latest Blogs

September 11, 2024

AI in Quality Assurance: How AI is Transforming Future of Quality Assurance

Read More →
August 8, 2024

How Generative AI is Transforming Product Engineering?

Read More →
August 8, 2024

Security Compliance Management : Necessity of Compliance for Product Security

Read More →