| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Storm and Kestrel</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-10"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li><a href="/documentation.html" id="documentation">Documentation</a></li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| </ul> |
| </li> |
| <li><a href="/2015/06/15/storm0100-beta-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Storm and Kestrel</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <p>This page explains how to use to Storm to consume items from a Kestrel cluster.</p> |
| |
| <h2 id="preliminaries">Preliminaries</h2> |
| |
| <h3 id="storm">Storm</h3> |
| |
| <p>This tutorial uses examples from the <a href="https://github.com/nathanmarz/storm-kestrel">storm-kestrel</a> project and the <a href="http://github.com/apache/storm/blob/master/examples/storm-starter">storm-starter</a> project. It's recommended that you clone those projects and follow along with the examples. Read <a href="https://github.com/apache/storm/wiki/Setting-up-development-environment">Setting up development environment</a> and <a href="https://github.com/apache/storm/wiki/Creating-a-new-Storm-project">Creating a new Storm project</a> to get your machine set up.</p> |
| |
| <h3 id="kestrel">Kestrel</h3> |
| |
| <p>It assumes you are able to run locally a Kestrel server as described <a href="https://github.com/nathanmarz/storm-kestrel">here</a>.</p> |
| |
| <h2 id="kestrel-server-and-queue">Kestrel Server and Queue</h2> |
| |
| <p>A single kestrel server has a set of queues. A Kestrel queue is a very simple message queue that runs on the JVM and uses the memcache protocol (with some extensions) to talk to clients. For details, look at the implementation of the <a href="https://github.com/nathanmarz/storm-kestrel/blob/master/src/jvm/backtype/storm/spout/KestrelThriftClient.java">KestrelThriftClient</a> class provided in <a href="https://github.com/nathanmarz/storm-kestrel">storm-kestrel</a> project.</p> |
| |
| <p>Each queue is strictly ordered following the FIFO (first in, first out) principle. To keep up with performance items are cached in system memory; though, only the first 128MB is kept in memory. When stopping the server, the queue state is stored in a journal file.</p> |
| |
| <p>Further, details can be found <a href="https://github.com/nathanmarz/kestrel/blob/master/docs/guide.md">here</a>.</p> |
| |
| <p>Kestrel is: |
| * fast |
| * small |
| * durable |
| * reliable</p> |
| |
| <p>For instance, Twitter uses Kestrel as the backbone of its messaging infrastructure as described <a href="http://bhavin.directi.com/notes-on-kestrel-the-open-source-twitter-queue/">here</a>.</p> |
| |
| <h2 id="add-items-to-kestrel">Add items to Kestrel</h2> |
| |
| <p>At first, we need to have a program that can add items to a Kestrel queue. The following method takes benefit of the KestrelClient implementation in <a href="https://github.com/nathanmarz/storm-kestrel">storm-kestrel</a>. It adds sentences into a Kestrel queue randomly chosen out of an array that holds five possible sentences.</p> |
| <div class="highlight"><pre><code class="language-text" data-lang="text"> private static void queueSentenceItems(KestrelClient kestrelClient, String queueName) |
| throws ParseError, IOException { |
| |
| String[] sentences = new String[] { |
| "the cow jumped over the moon", |
| "an apple a day keeps the doctor away", |
| "four score and seven years ago", |
| "snow white and the seven dwarfs", |
| "i am at two with nature"}; |
| |
| Random _rand = new Random(); |
| |
| for(int i=1; i<=10; i++){ |
| |
| String sentence = sentences[_rand.nextInt(sentences.length)]; |
| |
| String val = "ID " + i + " " + sentence; |
| |
| boolean queueSucess = kestrelClient.queue(queueName, val); |
| |
| System.out.println("queueSucess=" +queueSucess+ " [" + val +"]"); |
| } |
| } |
| </code></pre></div> |
| <h2 id="remove-items-from-kestrel">Remove items from Kestrel</h2> |
| |
| <p>This method dequeues items from a queue without removing them. |
| ``` |
| private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError |
| { |
| for(int i=1; i<=12; i++){</p> |
| <div class="highlight"><pre><code class="language-text" data-lang="text"> Item item = kestrelClient.dequeue(queueName); |
| |
| if(item==null){ |
| System.out.println("The queue (" + queueName + ") contains no items."); |
| } |
| else |
| { |
| byte[] data = item._data; |
| |
| String receivedVal = new String(data); |
| |
| System.out.println("receivedItem=" + receivedVal); |
| } |
| } |
| </code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">This method dequeues items from a queue and then removes them. |
| </code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName) |
| throws IOException, ParseError |
| { |
| for(int i=1; i<=12; i++){ |
| |
| Item item = kestrelClient.dequeue(queueName); |
| |
| |
| if(item==null){ |
| System.out.println("The queue (" + queueName + ") contains no items."); |
| } |
| else |
| { |
| int itemID = item._id; |
| |
| |
| byte[] data = item._data; |
| |
| String receivedVal = new String(data); |
| |
| kestrelClient.ack(queueName, itemID); |
| |
| System.out.println("receivedItem=" + receivedVal); |
| } |
| } |
| } |
| </code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">## Add Items continuously to Kestrel |
| |
| This is our final program to run in order to add continuously sentence items to a queue called **sentence_queue** of a locally running Kestrel server. |
| |
| In order to stop it type a closing bracket char ']' in console and hit 'Enter'. |
| </code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Random; |
| |
| import backtype.storm.spout.KestrelClient; |
| import backtype.storm.spout.KestrelClient.Item; |
| import backtype.storm.spout.KestrelClient.ParseError; |
| |
| public class AddSentenceItemsToKestrel { |
| |
| /** |
| * @param args |
| */ |
| public static void main(String[] args) { |
| |
| InputStream is = System.in; |
| |
| char closing_bracket = ']'; |
| |
| int val = closing_bracket; |
| |
| boolean aux = true; |
| |
| try { |
| |
| KestrelClient kestrelClient = null; |
| String queueName = "sentence_queue"; |
| |
| while(aux){ |
| |
| kestrelClient = new KestrelClient("localhost",22133); |
| |
| queueSentenceItems(kestrelClient, queueName); |
| |
| kestrelClient.close(); |
| |
| Thread.sleep(1000); |
| |
| if(is.available()>0){ |
| if(val==is.read()) |
| aux=false; |
| } |
| } |
| } catch (IOException e) { |
| // TODO Auto-generated catch block |
| e.printStackTrace(); |
| } |
| catch (ParseError e) { |
| // TODO Auto-generated catch block |
| e.printStackTrace(); |
| } catch (InterruptedException e) { |
| // TODO Auto-generated catch block |
| e.printStackTrace(); |
| } |
| |
| System.out.println("end"); |
| |
| } |
| } |
| </code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">## Using KestrelSpout |
| |
| This topology reads sentences off of a Kestrel queue using KestrelSpout, splits the sentences into its constituent words (Bolt: SplitSentence), and then emits for each word the number of times it has seen that word before (Bolt: WordCount). How data is processed is described in detail in [Guaranteeing message processing](Guaranteeing-message-processing.html). |
| </code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">TopologyBuilder builder = new TopologyBuilder(); |
| builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme())); |
| builder.setBolt("split", new SplitSentence(), 10) |
| .shuffleGrouping("sentences"); |
| builder.setBolt("count", new WordCount(), 20) |
| .fieldsGrouping("split", new Fields("word")); |
| </code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">## Execution |
| |
| At first, start your local kestrel server in production or development mode. |
| |
| Than, wait about 5 seconds in order to avoid a ConnectException. |
| |
| Now execute the program to add items to the queue and launch the Storm topology. The order in which you launch the programs is of no importance. |
| |
| If you run the topology with TOPOLOGY_DEBUG you should see tuples being emitted in the topology. |
| </code></pre></div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Storm</h5> |
| <p>Storm integrates with any queueing system and any database system. Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/documentation/Rationale.html">Rationale</a></li> |
| <li><a href="/tutorial.html">Tutorial</a></li> |
| <li><a href="/documentation/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/documentation/Creating-a-new-Storm-project.html">Creating a new Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/doc-index.html">Index</a></li> |
| <li><a href="/documentation.html">Manual</a></li> |
| <li><a href="https://storm.apache.org/javadoc/apidocs/index.html">Javadoc</a></li> |
| <li><a href="/documentation/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2015 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |