<!DOCTYPE html>
<html>
    <head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">

    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
    <link rel="icon" href="/favicon.ico" type="image/x-icon">

    <title>Storm and Kestrel</title>

    <!-- Bootstrap core CSS -->
    <link href="/assets/css/bootstrap.min.css" rel="stylesheet">
    <!-- Bootstrap theme -->
    <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">

    <!-- Custom styles for this template -->
    <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
    <link href="/css/style.css" rel="stylesheet">
    <link href="/assets/css/owl.theme.css" rel="stylesheet">
    <link href="/assets/css/owl.carousel.css" rel="stylesheet">
    <script type="text/javascript" src="/assets/js/jquery.min.js"></script>
    <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
    <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
    <script type="text/javascript" src="/assets/js/storm.js"></script>
    <!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
    <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
    
    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
    <!--[if lt IE 9]>
      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
    <![endif]-->
  </head>


  <body>
    <header>
  <div class="container-fluid">
     <div class="row">
          <div class="col-md-5">
            <a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
          </div>
          <div class="col-md-5">
            
              <h1>Version: 1.2.4</h1>
            
          </div>
          <div class="col-md-2">
            <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
          </div>
        </div>
    </div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
  <div class="container-fluid">
      <div class="navbar-header">
          <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
            </button>
        </div>
        <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
          <ul class="nav navbar-nav">
              <li><a href="/index.html" id="home">Home</a></li>
                <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
                <li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
                <li class="dropdown">
                    <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
                    <ul class="dropdown-menu">
                      
                        
                          <li><a href="/releases/2.4.0/index.html">2.4.0</a></li>
                        
                      
                        
                          <li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
                        
                      
                        
                          <li><a href="/releases/2.2.1/index.html">2.2.1</a></li>
                        
                      
                        
                          <li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
                        
                      
                        
                          <li><a href="/releases/2.1.1/index.html">2.1.1</a></li>
                        
                      
                        
                          <li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
                        
                      
                        
                          <li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
                        
                      
                        
                          <li><a href="/releases/1.2.4/index.html">1.2.4</a></li>
                        
                      
                        
                          <li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
                        
                      
                    </ul>
                </li>
                <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
                <li class="dropdown">
                    <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
                    <ul class="dropdown-menu">
                        <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
                        <li><a href="/contribute/People.html">People</a></li>
                        <li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
                        <li><a href="/Powered-By.html">PoweredBy</a></li>
                    </ul>
                </li>
                <li><a href="/2021/10/14/storm211-released.html" id="news">News</a></li>
            </ul>
        </nav>
    </div>
</div>



    <div class="container-fluid">
    <h1 class="page-title">Storm and Kestrel</h1>
          <div class="row">
           	<div class="col-md-12">
	             <!-- Documentation -->

<p class="post-meta"></p>

<div class="documentation-content"><p>This page explains how to use Storm to consume items from a Kestrel cluster.</p>

<h2 id="preliminaries">Preliminaries</h2>

<h3 id="storm">Storm</h3>

<p>This tutorial uses examples from the <a href="https://github.com/nathanmarz/storm-kestrel">storm-kestrel</a> project and the <a href="http://github.com/apache/storm/blob/1.2.4/examples/storm-starter">storm-starter</a> project. It&#39;s recommended that you clone those projects and follow along with the examples. Read <a href="Setting-up-development-environment.html">Setting up development environment</a> and <a href="Creating-a-new-Storm-project.html">Creating a new Storm project</a> to get your machine set up.</p>

<h3 id="kestrel">Kestrel</h3>

<p>It assumes you are able to run locally a Kestrel server as described <a href="https://github.com/nathanmarz/storm-kestrel">here</a>.</p>

<h2 id="kestrel-server-and-queue">Kestrel Server and Queue</h2>

<p>A single kestrel server has a set of queues. A Kestrel queue is a very simple message queue that runs on the JVM and uses the memcache protocol (with some extensions) to talk to clients. For details, look at the implementation of the <a href="https://github.com/nathanmarz/storm-kestrel/blob/master/src/jvm/org/apache/storm/spout/KestrelThriftClient.java">KestrelThriftClient</a> class provided in <a href="https://github.com/nathanmarz/storm-kestrel">storm-kestrel</a> project.</p>

<p>Each queue is strictly ordered following the FIFO (first in, first out) principle. To keep up with performance items are cached in system memory; though, only the first 128MB is kept in memory. When stopping the server, the queue state is stored in a journal file.</p>

<p>Further, details can be found <a href="https://github.com/nathanmarz/kestrel/blob/master/docs/guide.md">here</a>.</p>

<p>Kestrel is:
* fast
* small
* durable
* reliable</p>

<p>For instance, Twitter uses Kestrel as the backbone of its messaging infrastructure as described <a href="http://bhavin.directi.com/notes-on-kestrel-the-open-source-twitter-queue/">here</a>.</p>

<h2 id="add-items-to-kestrel">Add items to Kestrel</h2>

<p>At first, we need to have a program that can add items to a Kestrel queue. The following method takes benefit of the KestrelClient implementation in <a href="https://github.com/nathanmarz/storm-kestrel">storm-kestrel</a>. It adds sentences into a Kestrel queue randomly chosen out of an array that holds five possible sentences.</p>
<div class="highlight"><pre><code class="language-" data-lang="">    private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
            throws ParseError, IOException {

        String[] sentences = new String[] {
                "the cow jumped over the moon",
                "an apple a day keeps the doctor away",
                "four score and seven years ago",
                "snow white and the seven dwarfs",
                "i am at two with nature"};

        Random _rand = new Random();

        for(int i=1; i&lt;=10; i++){

            String sentence = sentences[_rand.nextInt(sentences.length)];

            String val = "ID " + i + " " + sentence;

            boolean queueSucess = kestrelClient.queue(queueName, val);

            System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
        }
    }
</code></pre></div>
<h2 id="remove-items-from-kestrel">Remove items from Kestrel</h2>

<p>This method dequeues items from a queue without removing them.
```
    private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError
             {
        for(int i=1; i&lt;=12; i++){</p>
<div class="highlight"><pre><code class="language-" data-lang="">        Item item = kestrelClient.dequeue(queueName);

        if(item==null){
            System.out.println("The queue (" + queueName + ") contains no items.");
        }
        else
        {
            byte[] data = item._data;

            String receivedVal = new String(data);

            System.out.println("receivedItem=" + receivedVal);
        }
    }
</code></pre></div><div class="highlight"><pre><code class="language-" data-lang="">
This method dequeues items from a queue and then removes them.
</code></pre></div><div class="highlight"><pre><code class="language-" data-lang="">private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
throws IOException, ParseError
     {
        for(int i=1; i&lt;=12; i++){

            Item item = kestrelClient.dequeue(queueName);


            if(item==null){
                System.out.println("The queue (" + queueName + ") contains no items.");
            }
            else
            {
                int itemID = item._id;


                byte[] data = item._data;

                String receivedVal = new String(data);

                kestrelClient.ack(queueName, itemID);

                System.out.println("receivedItem=" + receivedVal);
            }
        }
}
</code></pre></div><div class="highlight"><pre><code class="language-" data-lang="">
## Add Items continuously to Kestrel

This is our final program to run in order to add continuously sentence items to a queue called **sentence_queue** of a locally running Kestrel server.

In order to stop it type a closing bracket char ']' in console and hit 'Enter'.

</code></pre></div><div class="highlight"><pre><code class="language-" data-lang="">import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import org.apache.storm.spout.KestrelClient;
import org.apache.storm.spout.KestrelClient.Item;
import org.apache.storm.spout.KestrelClient.ParseError;

public class AddSentenceItemsToKestrel {

    /**
     * @param args
     */
    public static void main(String[] args) {

        InputStream is = System.in;

        char closing_bracket = ']';

        int val = closing_bracket;

        boolean aux = true;

        try {

            KestrelClient kestrelClient = null;
            String queueName = "sentence_queue";

            while(aux){

                kestrelClient = new KestrelClient("localhost",22133);

                queueSentenceItems(kestrelClient, queueName);

                kestrelClient.close();

                Thread.sleep(1000);

                if(is.available()&gt;0){
                 if(val==is.read())
                     aux=false;
                }
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        catch (ParseError e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("end");

    }
}
</code></pre></div><div class="highlight"><pre><code class="language-" data-lang="">## Using KestrelSpout

This topology reads sentences off of a Kestrel queue using KestrelSpout, splits the sentences into its constituent words (Bolt: SplitSentence), and then emits for each word the number of times it has seen that word before (Bolt: WordCount). How data is processed is described in detail in [Guaranteeing message processing](Guaranteeing-message-processing.html).

</code></pre></div><div class="highlight"><pre><code class="language-" data-lang="">TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
            .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
        .fieldsGrouping("split", new Fields("word"));
</code></pre></div><div class="highlight"><pre><code class="language-" data-lang="">
## Execution

At first, start your local kestrel server in production or development mode.

Than, wait about 5 seconds in order to avoid a ConnectException.

Now execute the program to add items to the queue and launch the Storm topology. The order in which you launch the programs is of no importance.

If you run the topology with TOPOLOGY_DEBUG you should see tuples being emitted in the topology.
</code></pre></div></div>


	          </div>
	       </div>
	  </div>
<footer>
    <div class="container-fluid">
        <div class="row">
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>Meetups</h5>
                    <ul class="latest-news">
                        
                        <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
                        
                        <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
                        
                        <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
                        
                        <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
                        
                        <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
                        
                        <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
                        
                        <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
                    </ul>
                </div>
            </div>
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>About Apache Storm</h5>
                    <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
               </div>
            </div>
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>First Look</h5>
                    <ul class="footer-list">
                        <li><a href="/releases/current/Rationale.html">Rationale</a></li>
                        <li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
                        <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
                        <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
                    </ul>
                </div>
            </div>
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>Documentation</h5>
                    <ul class="footer-list">
                        <li><a href="/releases/current/index.html">Index</a></li>
                        <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
                        <li><a href="/releases/current/FAQ.html">FAQ</a></li>
                    </ul>
                </div>
            </div>
        </div>
        <hr/>
        <div class="row">   
            <div class="col-md-12">
                <p align="center">Copyright © 2021 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. 
                    <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. 
                    <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
            </div>
        </div>
    </div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> 

</body>

</html>

