blob: 8b80bb7af46956293dcf2d3e3df1661f38ff8040 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Windowing Support in Core Storm</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 2.1.0</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Windowing Support in Core Storm</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><p>Storm core has support for processing a group of tuples that falls within a window. Windows are specified with the
following two parameters,</p>
<ol>
<li>Window length - the length or duration of the window</li>
<li>Sliding interval - the interval at which the windowing slides</li>
</ol>
<h2 id="sliding-window">Sliding Window</h2>
<p>Tuples are grouped in windows and window slides every sliding interval. A tuple can belong to more than one window.</p>
<p>For example a time duration based sliding window with length 10 secs and sliding interval of 5 seconds.</p>
<div class="highlight"><pre><code class="language-" data-lang="">........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
-5 0 5 10 15 -&gt; time
|&lt;------- w1 --&gt;|
|&lt;---------- w2 -----&gt;|
|&lt;-------------- w3 ----&gt;|
</code></pre></div>
<p>The window is evaluated every 5 seconds and some of the tuples in the first window overlaps with the second one.</p>
<p>Note: The window first slides at t = 5 secs and would contain events received up to the first five secs.</p>
<h2 id="tumbling-window">Tumbling Window</h2>
<p>Tuples are grouped in a single window based on time or count. Any tuple belongs to only one of the windows.</p>
<p>For example a time duration based tumbling window with length 5 secs.</p>
<div class="highlight"><pre><code class="language-" data-lang="">| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -&gt; time
w1 w2 w3
</code></pre></div>
<p>The window is evaluated every five seconds and none of the windows overlap.</p>
<p>Storm supports specifying the window length and sliding intervals as a count of the number of tuples or as a time duration.</p>
<p>The bolt interface <code>IWindowedBolt</code> is implemented by bolts that needs windowing support.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">IWindowedBolt</span> <span class="kd">extends</span> <span class="n">IComponent</span> <span class="o">{</span>
<span class="kt">void</span> <span class="nf">prepare</span><span class="o">(</span><span class="n">Map</span> <span class="n">stormConf</span><span class="o">,</span> <span class="n">TopologyContext</span> <span class="n">context</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">);</span>
<span class="cm">/**
* Process tuples falling within the window and optionally emit
* new tuples based on the tuples in the input window.
*/</span>
<span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TupleWindow</span> <span class="n">inputWindow</span><span class="o">);</span>
<span class="kt">void</span> <span class="nf">cleanup</span><span class="o">();</span>
<span class="o">}</span>
</code></pre></div>
<p>Every time the window activates, the <code>execute</code> method is invoked. The TupleWindow parameter gives access to the current tuples
in the window, the tuples that expired and the new tuples that are added since last window was computed which will be useful
for efficient windowing computations.</p>
<p>Bolts that needs windowing support typically would extend <code>BaseWindowedBolt</code> which has the apis for specifying the
window length and sliding intervals.</p>
<p>E.g. </p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">SlidingWindowBolt</span> <span class="kd">extends</span> <span class="n">BaseWindowedBolt</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">prepare</span><span class="o">(</span><span class="n">Map</span> <span class="n">stormConf</span><span class="o">,</span> <span class="n">TopologyContext</span> <span class="n">context</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span>
<span class="k">this</span><span class="o">.</span><span class="na">collector</span> <span class="o">=</span> <span class="n">collector</span><span class="o">;</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TupleWindow</span> <span class="n">inputWindow</span><span class="o">)</span> <span class="o">{</span>
<span class="k">for</span><span class="o">(</span><span class="n">Tuple</span> <span class="nl">tuple:</span> <span class="n">inputWindow</span><span class="o">.</span><span class="na">get</span><span class="o">())</span> <span class="o">{</span>
<span class="c1">// do the windowing computation</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="c1">// emit the results</span>
<span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">computedValue</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span>
<span class="n">TopologyBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"spout"</span><span class="o">,</span> <span class="k">new</span> <span class="n">RandomSentenceSpout</span><span class="o">(),</span> <span class="mi">1</span><span class="o">);</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"slidingwindowbolt"</span><span class="o">,</span>
<span class="k">new</span> <span class="nf">SlidingWindowBolt</span><span class="o">().</span><span class="na">withWindow</span><span class="o">(</span><span class="k">new</span> <span class="n">Count</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="k">new</span> <span class="n">Count</span><span class="o">(</span><span class="mi">10</span><span class="o">)),</span>
<span class="mi">1</span><span class="o">).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span>
<span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span>
<span class="n">conf</span><span class="o">.</span><span class="na">setDebug</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="n">conf</span><span class="o">.</span><span class="na">setNumWorkers</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopologyWithProgressBar</span><span class="o">(</span><span class="n">args</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> <span class="n">conf</span><span class="o">,</span> <span class="n">builder</span><span class="o">.</span><span class="na">createTopology</span><span class="o">());</span>
<span class="o">}</span>
</code></pre></div>
<p>The following window configurations are supported.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">withWindow</span><span class="o">(</span><span class="n">Count</span> <span class="n">windowLength</span><span class="o">,</span> <span class="n">Count</span> <span class="n">slidingInterval</span><span class="o">)</span>
<span class="n">Tuple</span> <span class="n">count</span> <span class="n">based</span> <span class="n">sliding</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">after</span> <span class="err">`</span><span class="n">slidingInterval</span><span class="err">`</span> <span class="n">number</span> <span class="n">of</span> <span class="n">tuples</span><span class="o">.</span>
<span class="nf">withWindow</span><span class="o">(</span><span class="n">Count</span> <span class="n">windowLength</span><span class="o">)</span>
<span class="n">Tuple</span> <span class="n">count</span> <span class="n">based</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">with</span> <span class="n">every</span> <span class="n">incoming</span> <span class="n">tuple</span><span class="o">.</span>
<span class="nf">withWindow</span><span class="o">(</span><span class="n">Count</span> <span class="n">windowLength</span><span class="o">,</span> <span class="n">Duration</span> <span class="n">slidingInterval</span><span class="o">)</span>
<span class="n">Tuple</span> <span class="n">count</span> <span class="n">based</span> <span class="n">sliding</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">after</span> <span class="err">`</span><span class="n">slidingInterval</span><span class="err">`</span> <span class="n">time</span> <span class="n">duration</span><span class="o">.</span>
<span class="nf">withWindow</span><span class="o">(</span><span class="n">Duration</span> <span class="n">windowLength</span><span class="o">,</span> <span class="n">Duration</span> <span class="n">slidingInterval</span><span class="o">)</span>
<span class="n">Time</span> <span class="n">duration</span> <span class="n">based</span> <span class="n">sliding</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">after</span> <span class="err">`</span><span class="n">slidingInterval</span><span class="err">`</span> <span class="n">time</span> <span class="n">duration</span><span class="o">.</span>
<span class="nf">withWindow</span><span class="o">(</span><span class="n">Duration</span> <span class="n">windowLength</span><span class="o">)</span>
<span class="n">Time</span> <span class="n">duration</span> <span class="n">based</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">with</span> <span class="n">every</span> <span class="n">incoming</span> <span class="n">tuple</span><span class="o">.</span>
<span class="nf">withWindow</span><span class="o">(</span><span class="n">Duration</span> <span class="n">windowLength</span><span class="o">,</span> <span class="n">Count</span> <span class="n">slidingInterval</span><span class="o">)</span>
<span class="n">Time</span> <span class="n">duration</span> <span class="n">based</span> <span class="n">sliding</span> <span class="n">window</span> <span class="n">configuration</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">after</span> <span class="err">`</span><span class="n">slidingInterval</span><span class="err">`</span> <span class="n">number</span> <span class="n">of</span> <span class="n">tuples</span><span class="o">.</span>
<span class="nf">withTumblingWindow</span><span class="o">(</span><span class="n">BaseWindowedBolt</span><span class="o">.</span><span class="na">Count</span> <span class="n">count</span><span class="o">)</span>
<span class="n">Count</span> <span class="n">based</span> <span class="n">tumbling</span> <span class="n">window</span> <span class="n">that</span> <span class="n">tumbles</span> <span class="n">after</span> <span class="n">the</span> <span class="n">specified</span> <span class="n">count</span> <span class="n">of</span> <span class="n">tuples</span><span class="o">.</span>
<span class="nf">withTumblingWindow</span><span class="o">(</span><span class="n">BaseWindowedBolt</span><span class="o">.</span><span class="na">Duration</span> <span class="n">duration</span><span class="o">)</span>
<span class="n">Time</span> <span class="n">duration</span> <span class="n">based</span> <span class="n">tumbling</span> <span class="n">window</span> <span class="n">that</span> <span class="n">tumbles</span> <span class="n">after</span> <span class="n">the</span> <span class="n">specified</span> <span class="n">time</span> <span class="n">duration</span><span class="o">.</span>
</code></pre></div>
<h2 id="tuple-timestamp-and-out-of-order-tuples">Tuple timestamp and out of order tuples</h2>
<p>By default the timestamp tracked in the window is the time when the tuple is processed by the bolt. The window calculations
are performed based on the processing timestamp. Storm has support for tracking windows based on the source generated timestamp.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/**
* Specify a field in the tuple that represents the timestamp as a long value. If this
* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/</span>
<span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withTimestampField</span><span class="o">(</span><span class="n">String</span> <span class="n">fieldName</span><span class="o">)</span>
</code></pre></div>
<p>The value for the above <code>fieldName</code> will be looked up from the incoming tuple and considered for windowing calculations.
If the field is not present in the tuple an exception will be thrown. Alternatively a <a href="../storm-client/src/jvm/org/apache/storm/windowing/TimestampExtractor.java">TimestampExtractor</a> can be used to
derive a timestamp value from a tuple (e.g. extract timestamp from a nested field within the tuple).</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/**
* Specify the timestamp extractor implementation.
*
* @param timestampExtractor the {@link TimestampExtractor} implementation
*/</span>
<span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withTimestampExtractor</span><span class="o">(</span><span class="n">TimestampExtractor</span> <span class="n">timestampExtractor</span><span class="o">)</span>
</code></pre></div>
<p>Along with the timestamp field name/extractor, a time lag parameter can also be specified which indicates the max time limit for tuples with out of order timestamps.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/**
* Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
* cannot be out of order by more than this amount.
*
* @param duration the max lag duration
*/</span>
<span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withLag</span><span class="o">(</span><span class="n">Duration</span> <span class="n">duration</span><span class="o">)</span>
</code></pre></div>
<p>E.g. If the lag is 5 secs and a tuple <code>t1</code> arrived with timestamp <code>06:00:05</code> no tuples may arrive with tuple timestamp earlier than <code>06:00:00</code>. If a tuple
arrives with timestamp 05:59:59 after <code>t1</code> and the window has moved past <code>t1</code>, it will be treated as a late tuple. Late tuples are not processed by default,
just logged in the worker log files at INFO level.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/**
* Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the
* {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field.
* It must be defined on a per-component basis, and in conjunction with the
* {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
*
* @param streamId the name of the stream used to emit late tuples on
*/</span>
<span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withLateTupleStream</span><span class="o">(</span><span class="n">String</span> <span class="n">streamId</span><span class="o">)</span>
</code></pre></div>
<p>This behaviour can be changed by specifying the above <code>streamId</code>. In this case late tuples are going to be emitted on the specified stream and accessible
via the field <code>WindowedBoltExecutor.LATE_TUPLE_FIELD</code>.</p>
<h3 id="watermarks">Watermarks</h3>
<p>For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is
the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept
used by Flink and Google&#39;s MillWheel for tracking event based timestamps.</p>
<p>Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if
tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/**
* Specify the watermark event generation interval. For tuple based timestamps, watermark events
* are used to track the progress of time
*
* @param interval the interval at which watermark events are generated
*/</span>
<span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withWatermarkInterval</span><span class="o">(</span><span class="n">Duration</span> <span class="n">interval</span><span class="o">)</span>
</code></pre></div>
<p>When a watermark is received, all windows up to that timestamp will be evaluated.</p>
<p>For example, consider tuple timestamp based processing with following window parameters,</p>
<p><code>Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s</code></p>
<div class="highlight"><pre><code class="language-" data-lang="">|-----|-----|-----|-----|-----|-----|-----|
0 10 20 30 40 50 60 70
</code></pre></div>
<p>Current ts = <code>09:00:00</code></p>
<p>Tuples <code>e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)</code> are received between <code>9:00:00</code> and <code>9:00:01</code></p>
<p>At time t = <code>09:00:01</code>, watermark w1 = <code>6:00:31</code> is emitted since no tuples earlier than <code>6:00:31</code> can arrive.</p>
<p>Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03)
and computing the ceiling based on the sliding interval (10s).</p>
<ol>
<li><code>5:59:50 - 06:00:10</code> with tuples e1, e2, e3</li>
<li><code>6:00:00 - 06:00:20</code> with tuples e1, e2, e3, e4</li>
<li><code>6:00:10 - 06:00:30</code> with tuples e4, e5</li>
</ol>
<p>e6 is not evaluated since watermark timestamp <code>6:00:31</code> is older than the tuple ts <code>6:00:36</code>.</p>
<p>Tuples <code>e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)</code> are received between <code>9:00:01</code> and <code>9:00:02</code></p>
<p>At time t = <code>09:00:02</code> another watermark w2 = <code>08:00:34</code> is emitted since no tuples earlier than <code>8:00:34</code> can arrive now.</p>
<p>Three windows will be evaluated,</p>
<ol>
<li><code>6:00:20 - 06:00:40</code> with tuples e5, e6 (from earlier batch)</li>
<li><code>6:00:30 - 06:00:50</code> with tuple e6 (from earlier batch)</li>
<li><code>8:00:10 - 08:00:30</code> with tuples e7, e8, e9</li>
</ol>
<p>e10 is not evaluated since the tuple ts <code>8:00:39</code> is beyond the watermark time <code>8:00:34</code>.</p>
<p>The window calculation considers the time gaps and computes the windows based on the tuple timestamp.</p>
<h2 id="guarantees">Guarantees</h2>
<p>The windowing functionality in storm core currently provides at-least once guarentee. The values emitted from the bolts
<code>execute(TupleWindow inputWindow)</code> method are automatically anchored to all the tuples in the inputWindow. The downstream
bolts are expected to ack the received tuple (i.e the tuple emitted from the windowed bolt) to complete the tuple tree.
If not the tuples will be replayed and the windowing computation will be re-evaluated. </p>
<p>The tuples in the window are automatically acked when the expire, i.e. when they fall out of the window after
<code>windowLength + slidingInterval</code>. Note that the configuration <code>topology.message.timeout.secs</code> should be sufficiently more
than <code>windowLength + slidingInterval</code> for time based windows; otherwise the tuples will timeout and get replayed and can result
in duplicate evaluations. For count based windows, the configuration should be adjusted such that <code>windowLength + slidingInterval</code>
tuples can be received within the timeout period.</p>
<h2 id="example-topology">Example topology</h2>
<p>An example toplogy <code>SlidingWindowTopology</code> shows how to use the apis to compute a sliding window sum and a tumbling window
average.</p>
<h2 id="stateful-windowing">Stateful windowing</h2>
<p>The default windowing implementation in storm stores the tuples in memory until they are processed and expired from the
window. This limits the use cases to windows that
fit entirely in memory. Also the source tuples cannot be ack-ed until the window expiry requiring large message timeouts
(topology.message.timeout.secs should be larger than the window length + sliding interval). This also puts extra loads
due to the complex acking and anchoring requirements.</p>
<p>To address the above limitations and to support larger window sizes, storm provides stateful windowing support via <code>IStatefulWindowedBolt</code>.
User bolts should typically extend <code>BaseStatefulWindowedBolt</code> for the windowing operations with the framework automatically
managing the state of the window in the background.</p>
<p>If the sources provide a monotonically increasing identifier as a part of the message, the framework can use this
to periodically checkpoint the last expired and evaluated message ids, to avoid duplicate window evaluations in case of
failures or restarts. During recovery, the tuples with message ids lower than last expired id are discarded and tuples with
message id between the last expired and last evaluated message ids are fed into the system without activating any previously
activated windows.
The tuples beyond the last evaluated message ids are processed as usual. This can be enabled by setting
the <code>messageIdField</code> as shown below,</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">topologyBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"mybolt"</span><span class="o">,</span>
<span class="k">new</span> <span class="nf">MyStatefulWindowedBolt</span><span class="o">()</span>
<span class="o">.</span><span class="na">withWindow</span><span class="o">(...)</span> <span class="c1">// windowing configuarations</span>
<span class="o">.</span><span class="na">withMessageIdField</span><span class="o">(</span><span class="s">"msgid"</span><span class="o">),</span> <span class="c1">// a monotonically increasing 'long' field in the tuple</span>
<span class="n">parallelism</span><span class="o">)</span>
<span class="o">.</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span>
</code></pre></div>
<p>However, this option is feasible only if the sources can provide a monotonically increasing identifier in the tuple and the same is maintained
while re-emitting the messages in case of failures. With this option the tuples are still buffered in memory until processed
and expired from the window.</p>
<p>For more details take a look at the sample topology in storm-starter <a href="../examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java">StatefulWindowingTopology</a> which will help you get started.</p>
<h3 id="window-checkpointing">Window checkpointing</h3>
<p>With window checkpointing, the monotonically increasing id is no longer required since the framework transparently saves the state of the window periodically into the configured state backend.
The state that is saved includes the tuples in the window, any system state that is required to recover the state of processing
and also the user state.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">topologyBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"mybolt"</span><span class="o">,</span>
<span class="k">new</span> <span class="nf">MyStatefulPersistentWindowedBolt</span><span class="o">()</span>
<span class="o">.</span><span class="na">withWindow</span><span class="o">(...)</span> <span class="c1">// windowing configuarations</span>
<span class="o">.</span><span class="na">withPersistence</span><span class="o">()</span> <span class="c1">// persist the window state</span>
<span class="o">.</span><span class="na">withMaxEventsInMemory</span><span class="o">(</span><span class="mi">25000</span><span class="o">),</span> <span class="c1">// max number of events to be cached in memory</span>
<span class="n">parallelism</span><span class="o">)</span>
<span class="o">.</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span>
</code></pre></div>
<p>The <code>withPersistence</code> instructs the framework to transparently save the tuples in window along with
any associated system and user state to the state backend. The <code>withMaxEventsInMemory</code> is an optional
configuration that specifies the maximum number of tuples that may be kept in memory. The tuples are transparently loaded from
the state backend as required and the ones that are most likely to be used again are retained in memory.</p>
<p>The state backend can be configured by setting the topology state provider config,</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// use redis for state persistence</span>
<span class="n">conf</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">Config</span><span class="o">.</span><span class="na">TOPOLOGY_STATE_PROVIDER</span><span class="o">,</span> <span class="s">"org.apache.storm.redis.state.RedisKeyValueStateProvider"</span><span class="o">);</span>
</code></pre></div>
<p>Currently storm supports Redis and HBase as state backends and uses the underlying state-checkpointing
framework for saving the window state. For more details on state checkpointing see <a href="State-checkpointing.html">State-checkpointing</a>.</p>
<p>Here is an example of a persistent windowed bolt that uses the window checkpointing to save its state. The <code>initState</code>
is invoked with the last saved state (user state) at initialization time. The execute method is invoked based on the configured
windowing parameters and the tuples in the active window can be accessed via an <code>iterator</code> as shown below.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyStatefulPersistentWindowedBolt</span> <span class="kd">extends</span> <span class="n">BaseStatefulWindowedBolt</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">KeyValueState</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;</span> <span class="n">state</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">initState</span><span class="o">(</span><span class="n">KeyValueState</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;</span> <span class="n">state</span><span class="o">)</span> <span class="o">{</span>
<span class="k">this</span><span class="o">.</span><span class="na">state</span> <span class="o">=</span> <span class="n">state</span><span class="o">;</span>
<span class="c1">// ...</span>
<span class="c1">// restore the state from the last saved state.</span>
<span class="c1">// ...</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TupleWindow</span> <span class="n">window</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// iterate over tuples in the current window</span>
<span class="n">Iterator</span><span class="o">&lt;</span><span class="n">Tuple</span><span class="o">&gt;</span> <span class="n">it</span> <span class="o">=</span> <span class="n">window</span><span class="o">.</span><span class="na">getIter</span><span class="o">();</span>
<span class="k">while</span> <span class="o">(</span><span class="n">it</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
<span class="c1">// compute some result based on the tuples in window</span>
<span class="o">}</span>
<span class="c1">// possibly update any state to be maintained across windows</span>
<span class="n">state</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">STATE_KEY</span><span class="o">,</span> <span class="n">updatedValue</span><span class="o">);</span>
<span class="c1">// emit the results downstream</span>
<span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">result</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<p><strong>Note:</strong> In case of persistent windowed bolts, use <code>TupleWindow.getIter</code> to retrieve an iterator over the
events in the window. If the number of tuples in windows is huge, invoking <code>TupleWindow.get</code> would
try to load all the tuples into memory and may throw an OOM exception.</p>
<p><strong>Note:</strong> In case of persistent windowed bolts the <code>TupleWindow.getNew</code> and <code>TupleWindow.getExpired</code> are currently not supported
and will throw an <code>UnsupportedOperationException</code>.</p>
<p>For more details take a look at the sample topology in storm-starter <a href="../examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java">PersistentWindowingTopology</a>
which will help you get started.</p>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>