blob: d331b83fc65e1bcfa258502f20899cf1c4f04f6c [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Stream API Overview</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 2.1.0</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Stream API Overview</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><ul>
<li><a href="#concepts">Concepts</a>
<ul>
<li><a href="#streambuilder">Stream Builder</a></li>
<li><a href="#valuemapper">Value mapper</a></li>
</ul></li>
<li><a href="#streamapis">Stream APIs</a>
<ul>
<li><a href="#basictransformations">Basic transformations</a>
<ul>
<li><a href="#filter">filter</a></li>
<li><a href="#map">map</a></li>
<li><a href="#flatmap">flatmap</a></li>
</ul></li>
<li><a href="#windowing">Windowing</a></li>
<li><a href="#keyvaluepairs">Transformation to key-value pairs</a>
<ul>
<li><a href="#mapflatmaptopair">mapToPair</a></li>
<li><a href="#mapflatmaptopair">flatMapToPair</a></li>
</ul></li>
<li><a href="#aggregations">Aggregations</a>
<ul>
<li><a href="#aggregatereduce">aggregate</a></li>
<li><a href="#aggregatereduce">reduce</a></li>
<li><a href="#aggregatereducebykey">aggregateByKey</a></li>
<li><a href="#aggregatereducebykey">reduceByKey</a></li>
<li><a href="#groupbykey">groupByKey</a></li>
<li><a href="#countbykey">countByKey</a></li>
</ul></li>
<li><a href="#repartition">Repartition</a></li>
<li><a href="#outputoperations">Output operations</a>
<ul>
<li><a href="#print">print</a></li>
<li><a href="#peek">peek</a></li>
<li><a href="#foreach">forEach</a></li>
<li><a href="#to">to</a></li>
</ul></li>
<li><a href="#branching">Branch</a></li>
<li><a href="#joins">Joins</a></li>
<li><a href="#cogroupbykey">CoGroupByKey</a></li>
<li><a href="#state">State</a>
<ul>
<li><a href="#updatestatebykey">updateStateByKey</a></li>
<li><a href="#statequery">stateQuery</a></li>
</ul></li>
</ul></li>
<li><a href="#guarantees">Guarantees</a><br></li>
<li><a href="#example">Example</a></li>
</ul>
<p>Historically Storm provided Spout and Bolt apis for expressing streaming computations. Though these apis are fairly simple to use,
there are no reusable constructs for expressing common streaming operations like filtering, transformations, windowing, joins,
aggregations and so on.</p>
<p>Stream APIs build on top of the Storm&#39;s spouts and bolts to provide a typed API for expressing streaming computations and supports functional style operations such as map-reduce. </p>
<h1 id="concepts"><a name="concepts"></a> Concepts</h1>
<p>Conceptually a <code>Stream</code> can be thought of as a stream of messages flowing through a pipeline. A <code>Stream</code> may be generated by reading messages out of a source like spout, or by transforming other streams. For example,</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// imports</span>
<span class="kn">import</span> <span class="nn">org.apache.storm.streams.Stream</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.storm.streams.StreamBuilder</span><span class="o">;</span>
<span class="o">...</span>
<span class="n">StreamBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamBuilder</span><span class="o">();</span>
<span class="c1">// a stream of sentences obtained from a source spout</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">sentences</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="k">new</span> <span class="n">RandomSentenceSpout</span><span class="o">()).</span><span class="na">map</span><span class="o">(</span><span class="n">tuple</span> <span class="o">-&gt;</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">));</span>
<span class="c1">// a stream of words obtained by transforming (splitting) the stream of sentences</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentences</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="n">s</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)));</span>
<span class="c1">// output operation that prints the words to console</span>
<span class="n">words</span><span class="o">.</span><span class="na">forEach</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">w</span><span class="o">));</span>
</code></pre></div>
<p>Most stream operations accept parameters that describe user-specified behavior typically via lambda expressions like <code>s -&gt; Arrays.asList(s.split(&quot; &quot;))</code> as in the above example.</p>
<p>A <code>Stream</code> supports two kinds of operations, </p>
<ol>
<li><strong>Transformations</strong> that produce another stream from the current stream (like the <code>flatMap</code> operation in the example above) </li>
<li><strong>Output operations</strong> that produce a result. (like the <code>forEach</code> operation in the example above).</li>
</ol>
<h2 id="stream-builder"><a name="streambuilder"></a> Stream Builder</h2>
<p><code>StreamBuilder</code> provides the builder apis to create a new stream. Typically a spout forms the source of a stream.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">StreamBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamBuilder</span><span class="o">();</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">Tuple</span><span class="o">&gt;</span> <span class="n">sentences</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="k">new</span> <span class="n">TestSentenceSpout</span><span class="o">());</span>
</code></pre></div>
<p>The <code>StreamBuilder</code> tracks the overall pipeline of operations expressed via the Stream. One can then create the Storm topology
via <code>build()</code> and submit it like a normal storm topology via <code>StormSubmitter</code>.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopologyWithProgressBar</span><span class="o">(</span><span class="s">"test"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Config</span><span class="o">(),</span> <span class="n">streamBuilder</span><span class="o">.</span><span class="na">build</span><span class="o">());</span>
</code></pre></div>
<h2 id="value-mapper"><a name="valuemapper"></a> Value mapper</h2>
<p>Value mappers can be used to extract specific fields from the tuples emitted from a spout to produce a typed stream of values. Value mappers are passed as arguments to the <code>StreamBuilder.newStream</code>.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">StreamBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamBuilder</span><span class="o">();</span>
<span class="c1">// extract the first field from the tuple to get a Stream&lt;String&gt; of sentences</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">sentences</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="k">new</span> <span class="n">TestWordSpout</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ValueMapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="mi">0</span><span class="o">));</span>
</code></pre></div>
<p>Storm provides strongly typed tuples via the <code>Pair</code> and Tuple classes (Tuple3 upto Tuple10). One can use a <code>TupleValueMapper</code> to produce a stream of typed tuples as shown below.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// extract first three fields of the tuple emitted by the spout to produce a stream of typed tuples.</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="k">new</span> <span class="n">TestSpout</span><span class="o">(),</span> <span class="n">TupleValueMappers</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">));</span>
</code></pre></div>
<h1 id="stream-apis"><a name="streamapis"></a> Stream APIs</h1>
<p>Storm&#39;s streaming apis (defined in <a href="../storm-client/src/jvm/org/apache/storm/streams/Stream.java">Stream</a> and <a href="../storm-client/src/jvm/org/apache/storm/streams/PairStream.java">PairStream</a>) currently support a wide range of operations such as transformations, filters, windowing, aggregations, branching, joins, stateful, output and debugging operations.</p>
<h2 id="basic-transformations"><a name="basictransformations"></a> Basic transformations</h2>
<h3 id="filter"><a name="filter"></a> filter</h3>
<p><code>filter</code> returns a stream consisting of the elements of the stream that matches the given <code>Predicate</code> (for which the predicate returns true). </p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">logs</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">errors</span> <span class="o">=</span> <span class="n">logs</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">line</span> <span class="o">-&gt;</span> <span class="n">line</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="s">"ERROR"</span><span class="o">));</span>
</code></pre></div>
<p>In the above example log lines with &#39;ERROR&#39; are filtered into an error stream which can be then be further processed.</p>
<h3 id="map"><a name="map"></a> map</h3>
<p><code>map</code> returns a stream consisting of the result of applying the given mapping function to the values of the stream.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">wordLengths</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">);</span>
</code></pre></div>
<p>The example generates a stream of word lengths from a stream of words by applying the String.length function on each value. Note that the type of the resultant stream of a map operation can be different from that of the original stream. </p>
<h3 id="flatmap"><a name="flatmap"></a> flatMap</h3>
<p><code>flatMap</code> returns a stream consisting of the results of replacing each value of the stream with the contents produced by applying the provided mapping function to each value. This is similar to map but each value can be mapped to 0 or more values.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">sentences</span> <span class="o">=</span> <span class="o">...</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentences</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="n">s</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)));</span>
</code></pre></div>
<p>In the above example, the lambda function splits each value in the stream to a list of words and the flatMap function generates a flattened stream of words out of it.</p>
<h2 id="windowing"><a name="windowing"></a> Windowing</h2>
<p>A <code>window</code> operation produces a windowed stream consisting of the elements that fall within the window as specified by the window parameter. All the windowing options supported in the underlying windowed bolts are supported via the Stream apis.</p>
<p><code>Stream&lt;T&gt; windowedStream = stream.window(Window&lt;?, ?&gt; windowConfig);</code></p>
<p>The windowConfig parameter specifies the windowing config like sliding or tumbling windows based on time duration or event count.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// time based sliding window</span>
<span class="n">stream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">SlidingWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">minutes</span><span class="o">(</span><span class="mi">10</span><span class="o">),</span> <span class="n">Duration</span><span class="o">.</span><span class="na">minutes</span><span class="o">(</span><span class="mi">1</span><span class="o">)));</span>
<span class="c1">// count based sliding window</span>
<span class="n">stream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">SlidingWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Count</span><span class="o">.(</span><span class="mi">10</span><span class="o">),</span> <span class="n">Count</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">2</span><span class="o">)));</span>
<span class="c1">// tumbling window</span>
<span class="n">stream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">TumblingWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">));</span>
<span class="c1">// specifying timestamp field for event time based processing and a late tuple stream.</span>
<span class="n">stream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">TumblingWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">)</span>
<span class="o">.</span><span class="na">withTimestampField</span><span class="o">(</span><span class="s">"ts"</span><span class="o">)</span>
<span class="o">.</span><span class="na">withLateTupleStream</span><span class="o">(</span><span class="s">"late_events"</span><span class="o">));</span>
</code></pre></div>
<p>A windowing operation splits the continuous stream of values into subsets and is necessary for performing operations like Joins and Aggregations.</p>
<h2 id="transformation-to-key-value-pairs"><a name="keyvaluepairs"></a> Transformation to key-value pairs</h2>
<h3 id="maptopair-and-flatmaptopair"><a name="mapflatmaptopair"></a> mapToPair and flatMapToPair</h3>
<p>These operations transform a Stream of values into a stream of key-value pairs.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">integers</span> <span class="o">=</span> <span class="err"></span> <span class="c1">// 1, 2, 3, 4, ... </span>
<span class="n">PairStream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">squares</span> <span class="o">=</span> <span class="n">integers</span><span class="o">.</span><span class="na">mapToPair</span><span class="o">(</span><span class="n">x</span> <span class="o">-&gt;</span> <span class="n">Pair</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">x</span><span class="o">,</span> <span class="n">x</span><span class="o">*</span><span class="n">x</span><span class="o">));</span> <span class="c1">// (1, 1), (2, 4), (3, 9), (4, 16), ...</span>
</code></pre></div>
<p>A key-value pair stream is required for operations like groupByKey, aggregateByKey, joins etc.</p>
<h2 id="aggregations"><a name="aggregations"></a> Aggregations</h2>
<p>Aggregate operations aggregate the values (or key-values) in a stream. Typically the aggregation operations are performed on a windowed stream where the aggregate results are emitted on each window activation.</p>
<h3 id="aggregate-and-reduce"><a name="aggregatereduce"></a> aggregate and reduce</h3>
<p><code>aggregate</code> and <code>reduce</code> computes global aggregation i.e. the values across all partitions are forwarded to a single task for computing the aggregate. </p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">numbers</span> <span class="o">=</span> <span class="err"></span>
<span class="c1">// aggregate the numbers and produce a stream of last 10 sec sums.</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">sums</span> <span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">TumblingWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">)).</span><span class="na">aggregate</span><span class="o">(</span><span class="k">new</span> <span class="n">Sum</span><span class="o">());</span>
<span class="c1">// the last 10 sec sums computed using reduce</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">sums</span> <span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">window</span><span class="o">(...).</span><span class="na">reduce</span><span class="o">((</span><span class="n">x</span><span class="o">,</span> <span class="n">y</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span><span class="o">);</span>
</code></pre></div>
<p><code>aggreagate</code> and <code>reduce</code> differs in the way in which the aggreate results are computed. </p>
<p>A <code>reduce</code> operation repeatedly applies the given reducer and reduces two values to a single value until there is only one value left. This may not be feasible or easy for all kinds of aggreagations (e.g. avg).</p>
<p>An <code>aggregate</code> operation does a mutable reduction. A mutable reduction accumulates results into an accumulator as it processes the values.</p>
<p>The aggregation operations (aggregate and reduce) automatically does a local aggregation whenever possible before doing the network shuffle to minimize the amount of messages transmitted over the network. For example to compute sum, a per-partition partial sum is computed and only the partial sums are transferred over the network to the target bolt where the partial sums are merged to produce the final sum. A <code>CombinerAggregator</code> interface is used as the argument of <code>aggregate</code> to enable this.</p>
<p>For example the <code>Sum</code> (passed as the argument of aggregate in the example above) can be implemented as a <code>CombinerAggregator</code> as follows.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">Sum</span> <span class="kd">implements</span> <span class="n">CombinerAggregator</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="c1">// The initial value of the sum</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">init</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="mi">0L</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// Updates the sum by adding the value (this could be a partial sum)</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Long</span> <span class="n">aggregate</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">aggregate</span> <span class="o">+</span> <span class="n">value</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// merges the partial sums</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">merge</span><span class="o">(</span><span class="n">Long</span> <span class="n">accum1</span><span class="o">,</span> <span class="n">Long</span> <span class="n">accum2</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">accum1</span> <span class="o">+</span> <span class="n">accum2</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// extract result from the accumulator (here the accumulator and result is the same)</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">Long</span> <span class="nf">result</span><span class="o">(</span><span class="n">Long</span> <span class="n">accum</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">accum</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<h3 id="aggregatebykey-and-reducebykey"><a name="aggregatereducebykey"></a> aggregateByKey and reduceByKey</h3>
<p>These are similar to the aggregate and reduce operations but does the aggregation per key.</p>
<p><code>aggregateByKey</code> aggregates the values for each key of the stream using the given Aggregator.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// a windowed stream of words</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">mapToPair</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="n">Pair</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">w</span><span class="o">,</span><span class="mi">1</span><span class="o">)</span> <span class="c1">// convert to a stream of (word, 1) pairs</span>
<span class="o">.</span><span class="na">aggregateByKey</span><span class="o">(</span><span class="k">new</span> <span class="n">Count</span><span class="o">&lt;&gt;());</span> <span class="c1">// compute counts per word</span>
</code></pre></div>
<p><code>reduceByKey</code> performs a reduction on the values for each key of this stream by repeatedly applying the reducer.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// a windowed stream of words</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">mapToPair</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="n">Pair</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">w</span><span class="o">,</span><span class="mi">1</span><span class="o">)</span> <span class="c1">// convert to a stream of (word, 1) pairs</span>
<span class="o">.</span><span class="na">reduceByKey</span><span class="o">((</span><span class="n">x</span><span class="o">,</span> <span class="n">y</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span><span class="o">);</span> <span class="c1">// compute counts per word</span>
</code></pre></div>
<p>Like the global aggregate/reduce, per-partition local aggregate (per key) is computed and the partial results are send to the target bolts where the partial results are merged to produce the final aggregate.</p>
<h3 id="groupbykey"><a name="groupbykey"></a> groupByKey</h3>
<p><code>groupByKey</code> on a stream of key-value pairs returns a new stream where the values are grouped by the keys.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// a stream of (user, score) pairs e.g. ("alice", 10), ("bob", 15), ("bob", 20), ("alice", 11), ("alice", 13)</span>
<span class="n">PairStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">scores</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// list of scores per user in the last window, e.g. ("alice", [10, 11, 13]), ("bob", [15, 20])</span>
<span class="n">PairStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">userScores</span> <span class="o">=</span> <span class="n">scores</span><span class="o">.</span><span class="na">window</span><span class="o">(...).</span><span class="na">groupByKey</span><span class="o">();</span>
</code></pre></div>
<h3 id="countbykey"><a name="countbykey"></a> countByKey</h3>
<p><code>countByKey</code> counts the values for each key of this stream.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// a windowed stream of words</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">mapToPair</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="n">Pair</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">w</span><span class="o">,</span><span class="mi">1</span><span class="o">)</span> <span class="c1">// convert to a stream of (word, 1) pairs</span>
<span class="o">.</span><span class="na">countByKey</span><span class="o">();</span> <span class="c1">// compute counts per word</span>
</code></pre></div>
<p>Internally <code>countByKey</code> uses <code>aggregateByKey</code> to compute the count.</p>
<h2 id="repartition"><a name="repartition"></a> Repartition</h2>
<p>A <code>repartition</code> operation re-partitions the current stream and returns a new stream with the specified number of partitions. Further operations on resultant stream would execute at that level of parallelism. Re-partiton can be used to increase or reduce the parallelism of the operations in the stream.</p>
<p>The initial number of partitions can be also specified while creating the stream (via the StreamBuilder.newStream)</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Stream 's1' will have 2 partitions and operations on s1 will execute at this level of parallelism</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">s1</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="k">new</span> <span class="n">TestWordSpout</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ValueMapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="mi">0</span><span class="o">),</span> <span class="mi">2</span><span class="o">);</span>
<span class="c1">// Stream 's2' and further operations will have three partitions</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">s2</span> <span class="o">=</span> <span class="n">s1</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">function1</span><span class="o">).</span><span class="na">repartition</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span>
<span class="c1">// perform a map operation on s2 and print the result</span>
<span class="n">s2</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">function2</span><span class="o">).</span><span class="na">print</span><span class="o">();</span>
</code></pre></div>
<p>Note: a <code>repartition</code> operation implies network transfer. In the above example the first map operation (function1) would be executed at a parallelism of 2 (on two partitions of s1), whereas the second map operation (function2) would be executed at a parallelism of 3 (on three partitions of s2). This also means that the first and second map operations has to be executed on two separate bolts and involves network transfer.</p>
<h2 id="output-operations"><a name="outputoperations"></a> Output operations</h2>
<p>Output operations push out the transformed values in the stream to the console, external sinks like databases, files or even Storm bolts.</p>
<h3 id="print"><a name="print"></a> print</h3>
<p><code>print</code> prints the values in the stream to console. For example,</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// transforms words to uppercase and prints to the console</span>
<span class="n">words</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="nl">String:</span><span class="o">:</span><span class="n">toUpperCase</span><span class="o">).</span><span class="na">print</span><span class="o">();</span>
</code></pre></div>
<h3 id="peek"><a name="peek"></a> peek</h3>
<p><code>peek</code> returns a stream consisting of the elements of the stream, additionally performing the provided action on each element as they are consumed from the resulting stream. This can be used to ‘inspect’ the values flowing at any stage in a stream.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">builder</span><span class="o">.</span><span class="na">newStream</span><span class="o">(...).</span><span class="na">flatMap</span><span class="o">(</span><span class="n">s</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)))</span>
<span class="c1">// print the results of the flatMap operation as the values flow across the stream.</span>
<span class="o">.</span><span class="na">peek</span><span class="o">(</span><span class="n">s</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">s</span><span class="o">))</span>
<span class="o">.</span><span class="na">mapToPair</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="n">Pair</span><span class="o">&lt;&gt;(</span><span class="n">w</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span>
</code></pre></div>
<h3 id="foreach"><a name="foreach"></a> forEach</h3>
<p>This is the most generic output operation and can be used to execute an arbitrary code for each value in the stream, like storing the results into an external database, file and so on.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">stream</span><span class="o">.</span><span class="na">forEach</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="c1">// log it</span>
<span class="n">LOG</span><span class="o">.</span><span class="na">debug</span><span class="o">(</span><span class="n">value</span><span class="o">)</span>
<span class="c1">// store the value into a db and so on...</span>
<span class="n">statement</span><span class="o">.</span><span class="na">executeUpdate</span><span class="o">(..);</span>
<span class="o">}</span>
<span class="o">);</span>
</code></pre></div>
<h3 id="to"><a name="to"></a> to</h3>
<p>This allows one to plug in existing bolts as sinks. </p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// The redisBolt is a standard storm bolt</span>
<span class="n">IRichBolt</span> <span class="n">redisBolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">RedisStoreBolt</span><span class="o">(</span><span class="n">poolConfig</span><span class="o">,</span> <span class="n">storeMapper</span><span class="o">);</span>
<span class="o">...</span>
<span class="c1">// generate the word counts and store it in redis using redis bolt</span>
<span class="n">builder</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="k">new</span> <span class="n">TestWordSpout</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ValueMapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="mi">0</span><span class="o">))</span>
<span class="o">.</span><span class="na">mapToPair</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="n">Pair</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">w</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span>
<span class="o">.</span><span class="na">countByKey</span><span class="o">()</span>
<span class="c1">// the (word, count) pairs are forwarded to the redisBolt which stores it in redis</span>
<span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="n">redisBolt</span><span class="o">);</span>
</code></pre></div>
<p>Note that this will provide guarantees only based on what the bolt provides.</p>
<h2 id="branch"><a name="branching"></a> Branch</h2>
<p>A <code>branch</code> operation can be used to express If-then-else logic on streams.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;[]</span> <span class="n">streams</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">branch</span><span class="o">(</span><span class="n">Predicate</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;...</span> <span class="n">predicates</span><span class="o">)</span>
</code></pre></div>
<p>The predicates are applied in the given order to the values of the stream and the result is forwarded to the corresponding (index based) result stream based on the first predicate that matches. If none of the predicates match a value, that value is dropped.</p>
<p>For example,</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Stream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;[]</span> <span class="n">streams</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="k">new</span> <span class="n">RandomIntegerSpout</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ValueMapper</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;(</span><span class="mi">0</span><span class="o">))</span>
<span class="o">.</span><span class="na">branch</span><span class="o">(</span><span class="n">x</span> <span class="o">-&gt;</span> <span class="o">(</span><span class="n">x</span> <span class="o">%</span> <span class="mi">2</span><span class="o">)</span> <span class="o">==</span> <span class="mi">0</span><span class="o">,</span>
<span class="n">x</span> <span class="o">-&gt;</span> <span class="o">(</span><span class="n">x</span> <span class="o">%</span> <span class="mi">2</span><span class="o">)</span> <span class="o">==</span> <span class="mi">1</span><span class="o">);</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">evenNumbers</span> <span class="o">=</span> <span class="n">streams</span><span class="o">[</span><span class="mi">0</span><span class="o">];</span>
<span class="n">Stream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">oddNumbers</span> <span class="o">=</span> <span class="n">streams</span><span class="o">[</span><span class="mi">1</span><span class="o">];</span>
</code></pre></div>
<h2 id="joins"><a name="joins"></a> Joins</h2>
<p>A <code>join</code> operation joins the values of one stream with the values having the same key from another stream. </p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">PairStream</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">squares</span> <span class="o">=</span> <span class="err"></span> <span class="c1">// (1, 1), (2, 4), (3, 9) ...</span>
<span class="n">PairStream</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">cubes</span> <span class="o">=</span> <span class="err"></span> <span class="c1">// (1, 1), (2, 8), (3, 27) ...</span>
<span class="c1">// join the sqaures and cubes stream to produce (1, [1, 1]), (2, [4, 8]), (3, [9, 27]) ...</span>
<span class="n">PairStream</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Pair</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">joined</span> <span class="o">=</span> <span class="n">squares</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">TumblingWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">5</span><span class="o">))).</span><span class="na">join</span><span class="o">(</span><span class="n">cubes</span><span class="o">);</span>
</code></pre></div>
<p>Joins are typically invoked on a windowed stream, joining the key-values that arrived on each stream in the current window. The parallelism of the stream on which the join is invoked is carried forward to the joined stream. An optional <code>ValueJoiner</code> can be passed as an argument to join to specify how to join the two values for each matching key (the default behavior is to return a <code>Pair</code> of the value from both streams).</p>
<p>Left, right and full outer joins are supported. </p>
<h2 id="cogroupbykey"><a name="cogroupbykey"></a> CoGroupByKey</h2>
<p><code>coGroupByKey</code> Groups the values of this stream with the values having the same key from the other stream.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)</span>
<span class="n">PairStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream1</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3)</span>
<span class="n">PairStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream2</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// the co-grouped values per key in the last window, e.g. (k1, ([v1], [x1, x2]), (k2, ([v2, v3], [])), (k3, ([], [x3]))</span>
<span class="n">PairStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">coGroupedStream</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="na">window</span><span class="o">(...).</span><span class="na">coGroupByKey</span><span class="o">(</span><span class="n">stream2</span><span class="o">);</span>
</code></pre></div>
<h2 id="state"><a name="state"></a> State</h2>
<p>Storm provides APIs for applications to save and update the state of its computation and also to query the state.</p>
<h3 id="updatestatebykey"><a name="updatestatebykey"></a> updateStateByKey</h3>
<p><code>updateStateByKey</code> updates the state by applying a given state update function to the previous state and the new value for the key. <code>updateStateByKey</code> can be invoked with either an initial value for the state and a state update function or by directly providing a <code>StateUpdater</code> implementation.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">PairStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="o">...</span>
<span class="c1">// Update the word counts in the state; here the first argument 0L is the initial value for the state and </span>
<span class="c1">// the second argument is a function that adds the count to the current value in the state.</span>
<span class="n">StreamState</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">streamState</span> <span class="o">=</span> <span class="n">wordCounts</span><span class="o">.</span><span class="na">updateStateByKey</span><span class="o">(</span><span class="mi">0L</span><span class="o">,</span> <span class="o">(</span><span class="n">state</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">state</span> <span class="o">+</span> <span class="n">count</span><span class="o">)</span>
<span class="n">streamState</span><span class="o">.</span><span class="na">toPairStream</span><span class="o">().</span><span class="na">print</span><span class="o">();</span>
</code></pre></div>
<p>The state value can be of any type. In the above example its of type <code>Long</code> and stores the word count. </p>
<p>Internally storm uses stateful bolts for storing the state. The Storm config <code>topology.state.provider</code> can be used to choose the state provider implementation. For example set this to <code>org.apache.storm.redis.state.RedisKeyValueStateProvider</code> for redis based state store.</p>
<h3 id="statequery"><a name="statequery"></a> stateQuery</h3>
<p><code>stateQuery</code> can be used to query the state (updated by <code>updateStateByKey</code>). The <code>StreamState</code> returned by the updateStateByKey operation has to be used for querying stream state. The values in the stream are used as the keys to query the state.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="c1">// The stream of words emitted by the QuerySpout is used as the keys to query the state.</span>
<span class="n">builder</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="k">new</span> <span class="n">QuerySpout</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ValueMapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="mi">0</span><span class="o">))</span>
<span class="c1">// Queries the state and emits the matching (key, value) as results. </span>
<span class="c1">// The stream state returned by updateStateByKey is passed as the argument to stateQuery.</span>
<span class="o">.</span><span class="na">stateQuery</span><span class="o">(</span><span class="n">streamState</span><span class="o">).</span><span class="na">print</span><span class="o">();</span>
</code></pre></div>
<h1 id="guarantees"><a name="guarantees"></a> Guarantees</h1>
<p>Right now the topologies built using Stream API provides <strong>at-least once</strong> guarantee. </p>
<p>Note that only the <code>updateStateByKey</code> operation currently executes on an underlying StatefulBolt. The other stateful operations (join, windowing, aggregation etc) executes on an IRichBolt and stores its state in memory. It relies on storms acking and replay mechanisms to rebuild the state.</p>
<p>In future the underlying framework of the Stream API would be enhanced to provide <strong>exactly once</strong> guarantees.</p>
<h1 id="example"><a name="example"></a> Example</h1>
<p>Here&#39;s a word count topology expressed using the Stream API,</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">StreamBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamBuilder</span><span class="o">();</span>
<span class="n">builder</span>
<span class="c1">// A stream of random sentences with two partitions</span>
<span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="k">new</span> <span class="n">RandomSentenceSpout</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ValueMapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="mi">0</span><span class="o">),</span> <span class="mi">2</span><span class="o">)</span>
<span class="c1">// a two seconds tumbling window</span>
<span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">TumblingWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">2</span><span class="o">)))</span>
<span class="c1">// split the sentences to words</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="n">s</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)))</span>
<span class="c1">// create a stream of (word, 1) pairs</span>
<span class="o">.</span><span class="na">mapToPair</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="n">Pair</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">w</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span>
<span class="c1">// compute the word counts in the last two second window</span>
<span class="o">.</span><span class="na">countByKey</span><span class="o">()</span>
<span class="c1">// print the results to stdout</span>
<span class="o">.</span><span class="na">print</span><span class="o">();</span>
</code></pre></div>
<p>The <code>RandomSentenceSpout</code> is a regular Storm spout that continuously emits random sentences. The stream of sentences are split into two second windows and the word count within each window is computed and printed.</p>
<p>The stream can then be submitted just like a regular topology as shown below.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">Config</span> <span class="n">config</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span>
<span class="n">config</span><span class="o">.</span><span class="na">setNumWorkers</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopologyWithProgressBar</span><span class="o">(</span><span class="s">"topology-name"</span><span class="o">,</span> <span class="n">config</span><span class="o">,</span> <span class="n">builder</span><span class="o">.</span><span class="na">build</span><span class="o">());</span>
</code></pre></div>
<p>More examples are available under <a href="../examples/storm-starter/src/jvm/org/apache/storm/starter/streams">storm-starter</a> which will help you get started.</p>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>