Storm and Kestrel
<h1 class="page-title">Storm and Kestrel</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><p>This page explains how to use Storm to consume items from a Kestrel cluster.</p>
<h2 id="preliminaries">Preliminaries</h2>
<h3 id="storm">Storm</h3>
<p>This tutorial uses examples from the <a href="">storm-kestrel</a> project and the <a href="">storm-starter</a> project. It&#39;s recommended that you clone those projects and follow along with the examples. Read <a href="Setting-up-development-environment.html">Setting up development environment</a> and <a href="Creating-a-new-Storm-project.html">Creating a new Storm project</a> to get your machine set up.</p>
<h3 id="kestrel">Kestrel</h3>
<p>It assumes you are able to run locally a Kestrel server as described <a href="">here</a>.</p>
<h2 id="kestrel-server-and-queue">Kestrel Server and Queue</h2>
<p>A single kestrel server has a set of queues. A Kestrel queue is a very simple message queue that runs on the JVM and uses the memcache protocol (with some extensions) to talk to clients. For details, look at the implementation of the <a href="">KestrelThriftClient</a> class provided in <a href="">storm-kestrel</a> project.</p>
<p>Each queue is strictly ordered following the FIFO (first in, first out) principle. To keep up with performance items are cached in system memory; though, only the first 128MB is kept in memory. When stopping the server, the queue state is stored in a journal file.</p>
<p>Further, details can be found <a href="">here</a>.</p>
<p>Kestrel is:
* fast
* small
* durable
* reliable</p>
<p>For instance, Twitter uses Kestrel as the backbone of its messaging infrastructure as described <a href="">here</a>.</p>
<h2 id="add-items-to-kestrel">Add items to Kestrel</h2>
<p>At first, we need to have a program that can add items to a Kestrel queue. The following method takes benefit of the KestrelClient implementation in <a href="">storm-kestrel</a>. It adds sentences into a Kestrel queue randomly chosen out of an array that holds five possible sentences.</p>
<div class="highlight"><pre><code class="language-" data-lang=""> private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
throws ParseError, IOException {
String[] sentences = new String[] {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"};
Random _rand = new Random();
for(int i=1; i&lt;=10; i++){
String sentence = sentences[_rand.nextInt(sentences.length)];
String val = "ID " + i + " " + sentence;
boolean queueSucess = kestrelClient.queue(queueName, val);
System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
<h2 id="remove-items-from-kestrel">Remove items from Kestrel</h2>
<p>This method dequeues items from a queue without removing them.</p>
<div class="highlight"><pre><code class="language-" data-lang=""> private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError
for(int i=1; i&lt;=12; i++){
Item item = kestrelClient.dequeue(queueName);
System.out.println("The queue (" + queueName + ") contains no items.");
byte[] data = item._data;
String receivedVal = new String(data);
System.out.println("receivedItem=" + receivedVal);
<p>This method dequeues items from a queue and then removes them.</p>
<div class="highlight"><pre><code class="language-" data-lang=""> private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
throws IOException, ParseError
for(int i=1; i&lt;=12; i++){
Item item = kestrelClient.dequeue(queueName);
System.out.println("The queue (" + queueName + ") contains no items.");
int itemID = item._id;
byte[] data = item._data;
String receivedVal = new String(data);
kestrelClient.ack(queueName, itemID);
System.out.println("receivedItem=" + receivedVal);
<h2 id="add-items-continuously-to-kestrel">Add Items continuously to Kestrel</h2>
<p>This is our final program to run in order to add continuously sentence items to a queue called <strong>sentence_queue</strong> of a locally running Kestrel server.</p>
<p>In order to stop it type a closing bracket char &#39;]&#39; in console and hit &#39;Enter&#39;.</p>
<div class="highlight"><pre><code class="language-" data-lang=""> import;
import java.util.Random;
import org.apache.storm.spout.KestrelClient;
import org.apache.storm.spout.KestrelClient.Item;
import org.apache.storm.spout.KestrelClient.ParseError;
public class AddSentenceItemsToKestrel {
* @param args
public static void main(String[] args) {
InputStream is =;
char closing_bracket = ']';
int val = closing_bracket;
boolean aux = true;
try {
KestrelClient kestrelClient = null;
String queueName = "sentence_queue";
kestrelClient = new KestrelClient("localhost",22133);
queueSentenceItems(kestrelClient, queueName);
} catch (IOException e) {
// TODO Auto-generated catch block
catch (ParseError e) {
// TODO Auto-generated catch block
} catch (InterruptedException e) {
// TODO Auto-generated catch block
<h2 id="using-kestrelspout">Using KestrelSpout</h2>
<p>This topology reads sentences off of a Kestrel queue using KestrelSpout, splits the sentences into its constituent words (Bolt: SplitSentence), and then emits for each word the number of times it has seen that word before (Bolt: WordCount). How data is processed is described in detail in <a href="Guaranteeing-message-processing.html">Guaranteeing message processing</a>.</p>
<div class="highlight"><pre><code class="language-" data-lang=""> TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));
<h2 id="execution">Execution</h2>
<p>At first, start your local kestrel server in production or development mode.</p>
<p>Than, wait about 5 seconds in order to avoid a ConnectException.</p>
<p>Now execute the program to add items to the queue and launch the Storm topology. The order in which you launch the programs is of no importance.</p>
<p>If you run the topology with TOPOLOGY_DEBUG you should see tuples being emitted in the topology.</p>
