In a time when Storm was still developed and maintained by Twitter, I was working on small experiments in local cluster mode by processing social data from Facebook. It rained a lot since then, and Storm is now an active project under Apache umbrella. The curiosity of mine was eager to see how would the Storm operate in multi node deployment, so I decided to build a cluster on SmartOS hypervisor. I really adore SmartOS. It is so lightweight and it loads from the USB media. While the other hypervisors I've tried (XenServer, ESX) reserved the "huge" amount of RAM (on my 24 GB server XenServer was taking almost 4GB for the Dom0), SmartOS needs 200-300 MB of RAM. The provisioning of new virtual machines is really fast. You can run an OS level virtualization units called zones, or the KVM based virtual machines to run almost any modern operating system (Linux, Windows, FreeBSD). You can learn more about SmartOS here.
Are you new to Storm and you don't know what this is all about? Well apart from being a meteorological phenomenon, Storm is a distributed fault tolerant and realtime data stream processing platform. What Hadoop is for batch processing, Storm is for realtime data processing. There are two different type of nodes in Storm. The master node, also called as nimbus is responsible for assigning, monitoring and distributing tasks to worker or supervisor nodes in Storm terminology. In between there is a ZooKeeper node which is responsible for coordination and auto discovery of the nodes. The computation workflow is encapsulated into the unit called topology. It is basically a graph of spouts, components that represent the source of data streams and bolts which do some sort of processing on input streams and often generate new streams. For example we could have a spout which consumes Apache log messages from RabbitMQ queue, and one or more bolts which apply some kind of transformation, aggregation, or filtering logic. As our business demand increases, we also want to process a ton of logs from Tomcat and Nginx and one worker node won't deal well with it. No problem! Just spawn a desired number of worker nodes to scale horizontally, and Storm will make sure data processing is balanced and distributed accross the nodes. And what if one of the worker node fails? Don't panic! Storm is able to reschedule and assign the task to one of the healthy nodes ensuring fault-tolerant exactly-once messaging semantics processing. The image below shows the components of a Storm cluster.
Preparing SmartOS VMs
Let’s first prepare the specifications for our SmartOS KVM virtual machines. I’ll use the Centos7 dataset to spawn Storm nodes. Find and import the image that corresponds to centos-7 dataset.
The cluster will consist of these nodes:
1 ZooKeeper node with 1024 MB RAM and 1 VCPUs
1 Nimbus node with 3048 MB RAM and 2 VCPUs
3 Supervisor nodes with 4096 MB RAM and 3 VCPUs
Here is a specification file for ZooKeeper node (you will need to change the network settings to match your environment). Save it as centos7-zookeeper.json.
The specifications for the Nimbus node is as follow. For the supervisor nodes we will basically use the same spec with some minor modifications.
Installing ZooKeeper node
Create a ZooKeeper VM from the spec file.
Make sure you can login into your newly created VM. Now we will install the required dependencies. First, download and install JDK 7.
Next, install ZooKeeper packages from the Cloudera repository.
If the installation went successful we are ready to configure ZooKeeper. It is important to enable the regular purging of old data and transaction logs, otherwise ZooKeeper will eat all of your disk space. Edit the configuration file.
Update the /etc/hosts file to include ZooKeeper’s hostname.
Finally we can start the ZooKeeper and verify it’s working as expected. Run these commands.
It is of crucial importance to run ZooKeeper under supervision, since ZooKeeper will terminate if it encounters any error. To achieve it you can use a number of process monitoring tools like monit, supervisord, or god.
Deploying Nimbus node
First, provision a new virtual machine for the Nimbus node.
The following steps apply for both, the Nimbus and the Supervisor nodes, so once we had the required dependencies installed, we will create the snapshot that later on can be used to provision the Supervisor nodes. Note that here is also important to run Nimbus and Supervisor daemons under supervision.
Download and install JDK 7
Add the following repository to install the ZeroMQ package.
JZMQ are the Java bindings for the ZeroMQ messaging framework. As we are going to compile and build JZMQ from sources, some dependencies are needed. Install them first.
Now we can download and install JZMQ.
Create Storm system user and group
Download and install Apache Storm
Download the latest version of Storm - 0.9.3 at the time of writing. Decompress the tarball and move it to the /usr/local directory.
Add ZooKeeper and Nimbus nodes to the hosts file.
Now edit the Storm configuration file. You need to add the ZooKeeper server/s, the Nimbus node, and change the path where Storm will store a small amount of state data.
Build the snapshot
Before we proceed with snapshotting, halt the Nimbus virtual machine.
Now make the snapshot of disk0 zvol which pertains to Nimbus dataset. Dump and compress the dataset.
To import the dataset that can be used as the template for the Supervisor node provisioning it is necessary to build the image manifest. Generate a random UUID and get the SHA1 hash of the supervisor dataset by running these commands.
We will also need the size of the dataset expressed in bytes.
With that information in hand we can create an image manifest file and import the dataset.
Provisioning Supervisor nodes
With the dataset imported we can start provisioning the Supervisor nodes using the slightly modified version of the Nimbus node specification file.
Note that we have changed the image_uuid to point to our imported dataset, and we have assigned the Supervisor node more memory, disk and one additional CPU. Now we can easily deploy and scale the worker nodes.
This is the output of vmadm list on my SmartOS instance.
Submitting the topology
I assume you've run /usr/local/storm/bin/storm nimbus and /usr/local/storm/bin/storm supervisor daemons under supervision. To test the cluster, we are going to deploy a simple topology from the storm-starter project. Run these commands on the Nimbus node. Start by cloning the Storm repository.
Go to storm/examples/storm-starter directory and build the topology jar using Maven.
Finally, submit the topology to the Storm cluster.
You can get detailed stats about the cluster infrastructure, configuration, topology execution, etc. from the Storm UI. Start the server.
Direct your browser to http://numbus_node_ip:8080.