<!DOCTYPE html>
<html>
    <head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">

    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
    <link rel="icon" href="/favicon.ico" type="image/x-icon">

    <title>Storm and Kestrel</title>

    <!-- Bootstrap core CSS -->
    <link href="/assets/css/bootstrap.min.css" rel="stylesheet">
    <!-- Bootstrap theme -->
    <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">

    <!-- Custom styles for this template -->
    <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
    <link href="/css/style.css" rel="stylesheet">
    <link href="/assets/css/owl.theme.css" rel="stylesheet">
    <link href="/assets/css/owl.carousel.css" rel="stylesheet">
    <script type="text/javascript" src="/assets/js/jquery.min.js"></script>
    <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
    <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
    <script type="text/javascript" src="/assets/js/storm.js"></script>
    <!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
    <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
    
    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
    <!--[if lt IE 9]>
      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
    <![endif]-->
  </head>


  <body>
    <header>
  <div class="container-fluid">
     <div class="row">
          <div class="col-md-5">
            <a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
          </div>
          <div class="col-md-5">
            
              <h1>Version: 2.1.0</h1>
            
          </div>
          <div class="col-md-2">
            <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
          </div>
        </div>
    </div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
  <div class="container-fluid">
      <div class="navbar-header">
          <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
            </button>
        </div>
        <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
          <ul class="nav navbar-nav">
              <li><a href="/index.html" id="home">Home</a></li>
                <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
                <li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
                <li class="dropdown">
                    <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
                    <ul class="dropdown-menu">
                      
                        
                          <li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
                        
                      
                        
                          <li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
                        
                      
                        
                          <li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
                        
                      
                        
                          <li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
                        
                      
                        
                          <li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
                        
                      
                    </ul>
                </li>
                <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
                <li class="dropdown">
                    <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
                    <ul class="dropdown-menu">
                        <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
                        <li><a href="/contribute/People.html">People</a></li>
                        <li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
                    </ul>
                </li>
                <li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
            </ul>
        </nav>
    </div>
</div>



    <div class="container-fluid">
    <h1 class="page-title">Storm and Kestrel</h1>
          <div class="row">
           	<div class="col-md-12">
	             <!-- Documentation -->

<p class="post-meta"></p>

<div class="documentation-content"><p>This page explains how to use Storm to consume items from a Kestrel cluster.</p>

<h2 id="preliminaries">Preliminaries</h2>

<h3 id="storm">Storm</h3>

<p>This tutorial uses examples from the <a href="https://github.com/nathanmarz/storm-kestrel">storm-kestrel</a> project and the <a href="http://github.com/apache/storm/blob/2.1.0/examples/storm-starter">storm-starter</a> project. It&#39;s recommended that you clone those projects and follow along with the examples. Read <a href="Setting-up-development-environment.html">Setting up development environment</a> and <a href="Creating-a-new-Storm-project.html">Creating a new Storm project</a> to get your machine set up.</p>

<h3 id="kestrel">Kestrel</h3>

<p>It assumes you are able to run locally a Kestrel server as described <a href="https://github.com/nathanmarz/storm-kestrel">here</a>.</p>

<h2 id="kestrel-server-and-queue">Kestrel Server and Queue</h2>

<p>A single kestrel server has a set of queues. A Kestrel queue is a very simple message queue that runs on the JVM and uses the memcache protocol (with some extensions) to talk to clients. For details, look at the implementation of the <a href="https://github.com/nathanmarz/storm-kestrel/blob/master/src/jvm/org/apache/storm/spout/KestrelThriftClient.java">KestrelThriftClient</a> class provided in <a href="https://github.com/nathanmarz/storm-kestrel">storm-kestrel</a> project.</p>

<p>Each queue is strictly ordered following the FIFO (first in, first out) principle. To keep up with performance items are cached in system memory; though, only the first 128MB is kept in memory. When stopping the server, the queue state is stored in a journal file.</p>

<p>Further, details can be found <a href="https://github.com/nathanmarz/kestrel/blob/master/docs/guide.md">here</a>.</p>

<p>Kestrel is:
* fast
* small
* durable
* reliable</p>

<p>For instance, Twitter uses Kestrel as the backbone of its messaging infrastructure as described <a href="http://bhavin.directi.com/notes-on-kestrel-the-open-source-twitter-queue/">here</a>.</p>

<h2 id="add-items-to-kestrel">Add items to Kestrel</h2>

<p>At first, we need to have a program that can add items to a Kestrel queue. The following method takes benefit of the KestrelClient implementation in <a href="https://github.com/nathanmarz/storm-kestrel">storm-kestrel</a>. It adds sentences into a Kestrel queue randomly chosen out of an array that holds five possible sentences.</p>
<div class="highlight"><pre><code class="language-" data-lang="">    private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
            throws ParseError, IOException {

        String[] sentences = new String[] {
                "the cow jumped over the moon",
                "an apple a day keeps the doctor away",
                "four score and seven years ago",
                "snow white and the seven dwarfs",
                "i am at two with nature"};

        Random _rand = new Random();

        for(int i=1; i&lt;=10; i++){

            String sentence = sentences[_rand.nextInt(sentences.length)];

            String val = "ID " + i + " " + sentence;

            boolean queueSucess = kestrelClient.queue(queueName, val);

            System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
        }
    }
</code></pre></div>
<h2 id="remove-items-from-kestrel">Remove items from Kestrel</h2>

<p>This method dequeues items from a queue without removing them.</p>
<div class="highlight"><pre><code class="language-" data-lang="">    private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError
             {
        for(int i=1; i&lt;=12; i++){

            Item item = kestrelClient.dequeue(queueName);

            if(item==null){
                System.out.println("The queue (" + queueName + ") contains no items.");
            }
            else
            {
                byte[] data = item._data;

                String receivedVal = new String(data);

                System.out.println("receivedItem=" + receivedVal);
            }
        }
</code></pre></div>
<p>This method dequeues items from a queue and then removes them.</p>
<div class="highlight"><pre><code class="language-" data-lang="">    private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
    throws IOException, ParseError
         {
            for(int i=1; i&lt;=12; i++){

                Item item = kestrelClient.dequeue(queueName);


                if(item==null){
                    System.out.println("The queue (" + queueName + ") contains no items.");
                }
                else
                {
                    int itemID = item._id;


                    byte[] data = item._data;

                    String receivedVal = new String(data);

                    kestrelClient.ack(queueName, itemID);

                    System.out.println("receivedItem=" + receivedVal);
                }
            }
    }
</code></pre></div>
<h2 id="add-items-continuously-to-kestrel">Add Items continuously to Kestrel</h2>

<p>This is our final program to run in order to add continuously sentence items to a queue called <strong>sentence_queue</strong> of a locally running Kestrel server.</p>

<p>In order to stop it type a closing bracket char &#39;]&#39; in console and hit &#39;Enter&#39;.</p>
<div class="highlight"><pre><code class="language-" data-lang="">    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Random;

    import org.apache.storm.spout.KestrelClient;
    import org.apache.storm.spout.KestrelClient.Item;
    import org.apache.storm.spout.KestrelClient.ParseError;

    public class AddSentenceItemsToKestrel {

        /**
         * @param args
         */
        public static void main(String[] args) {

            InputStream is = System.in;

            char closing_bracket = ']';

            int val = closing_bracket;

            boolean aux = true;

            try {

                KestrelClient kestrelClient = null;
                String queueName = "sentence_queue";

                while(aux){

                    kestrelClient = new KestrelClient("localhost",22133);

                    queueSentenceItems(kestrelClient, queueName);

                    kestrelClient.close();

                    Thread.sleep(1000);

                    if(is.available()&gt;0){
                     if(val==is.read())
                         aux=false;
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            catch (ParseError e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println("end");

        }
    }
</code></pre></div>
<h2 id="using-kestrelspout">Using KestrelSpout</h2>

<p>This topology reads sentences off of a Kestrel queue using KestrelSpout, splits the sentences into its constituent words (Bolt: SplitSentence), and then emits for each word the number of times it has seen that word before (Bolt: WordCount). How data is processed is described in detail in <a href="Guaranteeing-message-processing.html">Guaranteeing message processing</a>.</p>
<div class="highlight"><pre><code class="language-" data-lang="">    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
    builder.setBolt("split", new SplitSentence(), 10)
                .shuffleGrouping("sentences");
    builder.setBolt("count", new WordCount(), 20)
            .fieldsGrouping("split", new Fields("word"));
</code></pre></div>
<h2 id="execution">Execution</h2>

<p>At first, start your local kestrel server in production or development mode.</p>

<p>Than, wait about 5 seconds in order to avoid a ConnectException.</p>

<p>Now execute the program to add items to the queue and launch the Storm topology. The order in which you launch the programs is of no importance.</p>

<p>If you run the topology with TOPOLOGY_DEBUG you should see tuples being emitted in the topology.</p>
</div>


	          </div>
	       </div>
	  </div>
<footer>
    <div class="container-fluid">
        <div class="row">
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>Meetups</h5>
                    <ul class="latest-news">
                        
                        <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
                        
                        <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
                        
                        <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
                        
                        <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
                        
                        <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
                        
                        <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
                        
                        <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
                    </ul>
                </div>
            </div>
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>About Apache Storm</h5>
                    <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
               </div>
            </div>
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>First Look</h5>
                    <ul class="footer-list">
                        <li><a href="/releases/current/Rationale.html">Rationale</a></li>
                        <li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
                        <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
                        <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
                    </ul>
                </div>
            </div>
            <div class="col-md-3">
                <div class="footer-widget">
                    <h5>Documentation</h5>
                    <ul class="footer-list">
                        <li><a href="/releases/current/index.html">Index</a></li>
                        <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
                        <li><a href="/releases/current/FAQ.html">FAQ</a></li>
                    </ul>
                </div>
            </div>
        </div>
        <hr/>
        <div class="row">   
            <div class="col-md-12">
                <p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. 
                    <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. 
                    <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
            </div>
        </div>
    </div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> 

</body>

</html>

