| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Windowing Support in Core Storm</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-5"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-5"> |
| |
| <h1>Version: 2.0.0</h1> |
| |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li><a href="/releases/2.4.0/index.html">2.4.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.3.0/index.html">2.3.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.1/index.html">2.2.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.2.0/index.html">2.2.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.1/index.html">2.1.1</a></li> |
| |
| |
| |
| <li><a href="/releases/2.1.0/index.html">2.1.0</a></li> |
| |
| |
| |
| <li><a href="/releases/2.0.0/index.html">2.0.0</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.4/index.html">1.2.4</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.3/index.html">1.2.3</a></li> |
| |
| |
| </ul> |
| </li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| <li><a href="/Powered-By.html">PoweredBy</a></li> |
| </ul> |
| </li> |
| <li><a href="/2022/03/25/storm240-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Windowing Support in Core Storm</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <div class="documentation-content"><p>Storm core has support for processing a group of tuples that falls within a window. Windows are specified with the |
| following two parameters,</p> |
| |
| <ol> |
| <li>Window length - the length or duration of the window</li> |
| <li>Sliding interval - the interval at which the windowing slides</li> |
| </ol> |
| |
| <h2 id="sliding-window">Sliding Window</h2> |
| |
| <p>Tuples are grouped in windows and window slides every sliding interval. A tuple can belong to more than one window.</p> |
| |
| <p>For example a time duration based sliding window with length 10 secs and sliding interval of 5 seconds.</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... |
| -5 0 5 10 15 -> time |
| |<------- w1 -->| |
| |<---------- w2 ----->| |
| |<-------------- w3 ---->| |
| </code></pre></div> |
| <p>The window is evaluated every 5 seconds and some of the tuples in the first window overlaps with the second one.</p> |
| |
| <p>Note: The window first slides at t = 5 secs and would contain events received up to the first five secs.</p> |
| |
| <h2 id="tumbling-window">Tumbling Window</h2> |
| |
| <p>Tuples are grouped in a single window based on time or count. Any tuple belongs to only one of the windows.</p> |
| |
| <p>For example a time duration based tumbling window with length 5 secs.</p> |
| <div class="highlight"><pre><code class="language-" data-lang="">| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... |
| 0 5 10 15 -> time |
| w1 w2 w3 |
| </code></pre></div> |
| <p>The window is evaluated every five seconds and none of the windows overlap.</p> |
| |
| <p>Storm supports specifying the window length and sliding intervals as a count of the number of tuples or as a time duration.</p> |
| |
| <p>The bolt interface <code>IWindowedBolt</code> is implemented by bolts that needs windowing support.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">IWindowedBolt</span> <span class="kd">extends</span> <span class="n">IComponent</span> <span class="o">{</span> |
| <span class="kt">void</span> <span class="nf">prepare</span><span class="o">(</span><span class="n">Map</span> <span class="n">stormConf</span><span class="o">,</span> <span class="n">TopologyContext</span> <span class="n">context</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">);</span> |
| <span class="cm">/** |
| * Process tuples falling within the window and optionally emit |
| * new tuples based on the tuples in the input window. |
| */</span> |
| <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TupleWindow</span> <span class="n">inputWindow</span><span class="o">);</span> |
| <span class="kt">void</span> <span class="nf">cleanup</span><span class="o">();</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>Every time the window activates, the <code>execute</code> method is invoked. The TupleWindow parameter gives access to the current tuples |
| in the window, the tuples that expired and the new tuples that are added since last window was computed which will be useful |
| for efficient windowing computations.</p> |
| |
| <p>Bolts that needs windowing support typically would extend <code>BaseWindowedBolt</code> which has the apis for specifying the |
| window length and sliding intervals.</p> |
| |
| <p>E.g. </p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">SlidingWindowBolt</span> <span class="kd">extends</span> <span class="n">BaseWindowedBolt</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">;</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">prepare</span><span class="o">(</span><span class="n">Map</span> <span class="n">stormConf</span><span class="o">,</span> <span class="n">TopologyContext</span> <span class="n">context</span><span class="o">,</span> <span class="n">OutputCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">collector</span> <span class="o">=</span> <span class="n">collector</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TupleWindow</span> <span class="n">inputWindow</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">for</span><span class="o">(</span><span class="n">Tuple</span> <span class="nl">tuple:</span> <span class="n">inputWindow</span><span class="o">.</span><span class="na">get</span><span class="o">())</span> <span class="o">{</span> |
| <span class="c1">// do the windowing computation</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| <span class="c1">// emit the results</span> |
| <span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">computedValue</span><span class="o">));</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">TopologyBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span> |
| <span class="n">builder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"spout"</span><span class="o">,</span> <span class="k">new</span> <span class="n">RandomSentenceSpout</span><span class="o">(),</span> <span class="mi">1</span><span class="o">);</span> |
| <span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"slidingwindowbolt"</span><span class="o">,</span> |
| <span class="k">new</span> <span class="nf">SlidingWindowBolt</span><span class="o">().</span><span class="na">withWindow</span><span class="o">(</span><span class="k">new</span> <span class="n">Count</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="k">new</span> <span class="n">Count</span><span class="o">(</span><span class="mi">10</span><span class="o">)),</span> |
| <span class="mi">1</span><span class="o">).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span> |
| <span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span> |
| <span class="n">conf</span><span class="o">.</span><span class="na">setDebug</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span> |
| <span class="n">conf</span><span class="o">.</span><span class="na">setNumWorkers</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> |
| |
| <span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopologyWithProgressBar</span><span class="o">(</span><span class="n">args</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> <span class="n">conf</span><span class="o">,</span> <span class="n">builder</span><span class="o">.</span><span class="na">createTopology</span><span class="o">());</span> |
| |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>The following window configurations are supported.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">withWindow</span><span class="o">(</span><span class="n">Count</span> <span class="n">windowLength</span><span class="o">,</span> <span class="n">Count</span> <span class="n">slidingInterval</span><span class="o">)</span> |
| <span class="n">Tuple</span> <span class="n">count</span> <span class="n">based</span> <span class="n">sliding</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">after</span> <span class="err">`</span><span class="n">slidingInterval</span><span class="err">`</span> <span class="n">number</span> <span class="n">of</span> <span class="n">tuples</span><span class="o">.</span> |
| |
| <span class="nf">withWindow</span><span class="o">(</span><span class="n">Count</span> <span class="n">windowLength</span><span class="o">)</span> |
| <span class="n">Tuple</span> <span class="n">count</span> <span class="n">based</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">with</span> <span class="n">every</span> <span class="n">incoming</span> <span class="n">tuple</span><span class="o">.</span> |
| |
| <span class="nf">withWindow</span><span class="o">(</span><span class="n">Count</span> <span class="n">windowLength</span><span class="o">,</span> <span class="n">Duration</span> <span class="n">slidingInterval</span><span class="o">)</span> |
| <span class="n">Tuple</span> <span class="n">count</span> <span class="n">based</span> <span class="n">sliding</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">after</span> <span class="err">`</span><span class="n">slidingInterval</span><span class="err">`</span> <span class="n">time</span> <span class="n">duration</span><span class="o">.</span> |
| |
| <span class="nf">withWindow</span><span class="o">(</span><span class="n">Duration</span> <span class="n">windowLength</span><span class="o">,</span> <span class="n">Duration</span> <span class="n">slidingInterval</span><span class="o">)</span> |
| <span class="n">Time</span> <span class="n">duration</span> <span class="n">based</span> <span class="n">sliding</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">after</span> <span class="err">`</span><span class="n">slidingInterval</span><span class="err">`</span> <span class="n">time</span> <span class="n">duration</span><span class="o">.</span> |
| |
| <span class="nf">withWindow</span><span class="o">(</span><span class="n">Duration</span> <span class="n">windowLength</span><span class="o">)</span> |
| <span class="n">Time</span> <span class="n">duration</span> <span class="n">based</span> <span class="n">window</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">with</span> <span class="n">every</span> <span class="n">incoming</span> <span class="n">tuple</span><span class="o">.</span> |
| |
| <span class="nf">withWindow</span><span class="o">(</span><span class="n">Duration</span> <span class="n">windowLength</span><span class="o">,</span> <span class="n">Count</span> <span class="n">slidingInterval</span><span class="o">)</span> |
| <span class="n">Time</span> <span class="n">duration</span> <span class="n">based</span> <span class="n">sliding</span> <span class="n">window</span> <span class="n">configuration</span> <span class="n">that</span> <span class="n">slides</span> <span class="n">after</span> <span class="err">`</span><span class="n">slidingInterval</span><span class="err">`</span> <span class="n">number</span> <span class="n">of</span> <span class="n">tuples</span><span class="o">.</span> |
| |
| <span class="nf">withTumblingWindow</span><span class="o">(</span><span class="n">BaseWindowedBolt</span><span class="o">.</span><span class="na">Count</span> <span class="n">count</span><span class="o">)</span> |
| <span class="n">Count</span> <span class="n">based</span> <span class="n">tumbling</span> <span class="n">window</span> <span class="n">that</span> <span class="n">tumbles</span> <span class="n">after</span> <span class="n">the</span> <span class="n">specified</span> <span class="n">count</span> <span class="n">of</span> <span class="n">tuples</span><span class="o">.</span> |
| |
| <span class="nf">withTumblingWindow</span><span class="o">(</span><span class="n">BaseWindowedBolt</span><span class="o">.</span><span class="na">Duration</span> <span class="n">duration</span><span class="o">)</span> |
| <span class="n">Time</span> <span class="n">duration</span> <span class="n">based</span> <span class="n">tumbling</span> <span class="n">window</span> <span class="n">that</span> <span class="n">tumbles</span> <span class="n">after</span> <span class="n">the</span> <span class="n">specified</span> <span class="n">time</span> <span class="n">duration</span><span class="o">.</span> |
| |
| </code></pre></div> |
| <h2 id="tuple-timestamp-and-out-of-order-tuples">Tuple timestamp and out of order tuples</h2> |
| |
| <p>By default the timestamp tracked in the window is the time when the tuple is processed by the bolt. The window calculations |
| are performed based on the processing timestamp. Storm has support for tracking windows based on the source generated timestamp.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/** |
| * Specify a field in the tuple that represents the timestamp as a long value. If this |
| * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown. |
| * |
| * @param fieldName the name of the field that contains the timestamp |
| */</span> |
| <span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withTimestampField</span><span class="o">(</span><span class="n">String</span> <span class="n">fieldName</span><span class="o">)</span> |
| </code></pre></div> |
| <p>The value for the above <code>fieldName</code> will be looked up from the incoming tuple and considered for windowing calculations. |
| If the field is not present in the tuple an exception will be thrown. Alternatively a <a href="../storm-client/src/jvm/org/apache/storm/windowing/TimestampExtractor.java">TimestampExtractor</a> can be used to |
| derive a timestamp value from a tuple (e.g. extract timestamp from a nested field within the tuple).</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/** |
| * Specify the timestamp extractor implementation. |
| * |
| * @param timestampExtractor the {@link TimestampExtractor} implementation |
| */</span> |
| <span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withTimestampExtractor</span><span class="o">(</span><span class="n">TimestampExtractor</span> <span class="n">timestampExtractor</span><span class="o">)</span> |
| </code></pre></div> |
| <p>Along with the timestamp field name/extractor, a time lag parameter can also be specified which indicates the max time limit for tuples with out of order timestamps.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/** |
| * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps |
| * cannot be out of order by more than this amount. |
| * |
| * @param duration the max lag duration |
| */</span> |
| <span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withLag</span><span class="o">(</span><span class="n">Duration</span> <span class="n">duration</span><span class="o">)</span> |
| </code></pre></div> |
| <p>E.g. If the lag is 5 secs and a tuple <code>t1</code> arrived with timestamp <code>06:00:05</code> no tuples may arrive with tuple timestamp earlier than <code>06:00:00</code>. If a tuple |
| arrives with timestamp 05:59:59 after <code>t1</code> and the window has moved past <code>t1</code>, it will be treated as a late tuple. Late tuples are not processed by default, |
| just logged in the worker log files at INFO level.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/** |
| * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the |
| * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. |
| * It must be defined on a per-component basis, and in conjunction with the |
| * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. |
| * |
| * @param streamId the name of the stream used to emit late tuples on |
| */</span> |
| <span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withLateTupleStream</span><span class="o">(</span><span class="n">String</span> <span class="n">streamId</span><span class="o">)</span> |
| |
| </code></pre></div> |
| <p>This behaviour can be changed by specifying the above <code>streamId</code>. In this case late tuples are going to be emitted on the specified stream and accessible |
| via the field <code>WindowedBoltExecutor.LATE_TUPLE_FIELD</code>.</p> |
| |
| <h3 id="watermarks">Watermarks</h3> |
| |
| <p>For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is |
| the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept |
| used by Flink and Google's MillWheel for tracking event based timestamps.</p> |
| |
| <p>Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if |
| tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="cm">/** |
| * Specify the watermark event generation interval. For tuple based timestamps, watermark events |
| * are used to track the progress of time |
| * |
| * @param interval the interval at which watermark events are generated |
| */</span> |
| <span class="kd">public</span> <span class="n">BaseWindowedBolt</span> <span class="nf">withWatermarkInterval</span><span class="o">(</span><span class="n">Duration</span> <span class="n">interval</span><span class="o">)</span> |
| </code></pre></div> |
| <p>When a watermark is received, all windows up to that timestamp will be evaluated.</p> |
| |
| <p>For example, consider tuple timestamp based processing with following window parameters,</p> |
| |
| <p><code>Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s</code></p> |
| <div class="highlight"><pre><code class="language-" data-lang="">|-----|-----|-----|-----|-----|-----|-----| |
| 0 10 20 30 40 50 60 70 |
| </code></pre></div> |
| <p>Current ts = <code>09:00:00</code></p> |
| |
| <p>Tuples <code>e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)</code> are received between <code>9:00:00</code> and <code>9:00:01</code></p> |
| |
| <p>At time t = <code>09:00:01</code>, watermark w1 = <code>6:00:31</code> is emitted since no tuples earlier than <code>6:00:31</code> can arrive.</p> |
| |
| <p>Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03) |
| and computing the ceiling based on the sliding interval (10s).</p> |
| |
| <ol> |
| <li><code>5:59:50 - 06:00:10</code> with tuples e1, e2, e3</li> |
| <li><code>6:00:00 - 06:00:20</code> with tuples e1, e2, e3, e4</li> |
| <li><code>6:00:10 - 06:00:30</code> with tuples e4, e5</li> |
| </ol> |
| |
| <p>e6 is not evaluated since watermark timestamp <code>6:00:31</code> is older than the tuple ts <code>6:00:36</code>.</p> |
| |
| <p>Tuples <code>e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)</code> are received between <code>9:00:01</code> and <code>9:00:02</code></p> |
| |
| <p>At time t = <code>09:00:02</code> another watermark w2 = <code>08:00:34</code> is emitted since no tuples earlier than <code>8:00:34</code> can arrive now.</p> |
| |
| <p>Three windows will be evaluated,</p> |
| |
| <ol> |
| <li><code>6:00:20 - 06:00:40</code> with tuples e5, e6 (from earlier batch)</li> |
| <li><code>6:00:30 - 06:00:50</code> with tuple e6 (from earlier batch)</li> |
| <li><code>8:00:10 - 08:00:30</code> with tuples e7, e8, e9</li> |
| </ol> |
| |
| <p>e10 is not evaluated since the tuple ts <code>8:00:39</code> is beyond the watermark time <code>8:00:34</code>.</p> |
| |
| <p>The window calculation considers the time gaps and computes the windows based on the tuple timestamp.</p> |
| |
| <h2 id="guarantees">Guarantees</h2> |
| |
| <p>The windowing functionality in storm core currently provides at-least once guarentee. The values emitted from the bolts |
| <code>execute(TupleWindow inputWindow)</code> method are automatically anchored to all the tuples in the inputWindow. The downstream |
| bolts are expected to ack the received tuple (i.e the tuple emitted from the windowed bolt) to complete the tuple tree. |
| If not the tuples will be replayed and the windowing computation will be re-evaluated. </p> |
| |
| <p>The tuples in the window are automatically acked when the expire, i.e. when they fall out of the window after |
| <code>windowLength + slidingInterval</code>. Note that the configuration <code>topology.message.timeout.secs</code> should be sufficiently more |
| than <code>windowLength + slidingInterval</code> for time based windows; otherwise the tuples will timeout and get replayed and can result |
| in duplicate evaluations. For count based windows, the configuration should be adjusted such that <code>windowLength + slidingInterval</code> |
| tuples can be received within the timeout period.</p> |
| |
| <h2 id="example-topology">Example topology</h2> |
| |
| <p>An example toplogy <code>SlidingWindowTopology</code> shows how to use the apis to compute a sliding window sum and a tumbling window |
| average.</p> |
| |
| <h2 id="stateful-windowing">Stateful windowing</h2> |
| |
| <p>The default windowing implementation in storm stores the tuples in memory until they are processed and expired from the |
| window. This limits the use cases to windows that |
| fit entirely in memory. Also the source tuples cannot be ack-ed until the window expiry requiring large message timeouts |
| (topology.message.timeout.secs should be larger than the window length + sliding interval). This also puts extra loads |
| due to the complex acking and anchoring requirements.</p> |
| |
| <p>To address the above limitations and to support larger window sizes, storm provides stateful windowing support via <code>IStatefulWindowedBolt</code>. |
| User bolts should typically extend <code>BaseStatefulWindowedBolt</code> for the windowing operations with the framework automatically |
| managing the state of the window in the background.</p> |
| |
| <p>If the sources provide a monotonically increasing identifier as a part of the message, the framework can use this |
| to periodically checkpoint the last expired and evaluated message ids, to avoid duplicate window evaluations in case of |
| failures or restarts. During recovery, the tuples with message ids lower than last expired id are discarded and tuples with |
| message id between the last expired and last evaluated message ids are fed into the system without activating any previously |
| activated windows. |
| The tuples beyond the last evaluated message ids are processed as usual. This can be enabled by setting |
| the <code>messageIdField</code> as shown below,</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">topologyBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"mybolt"</span><span class="o">,</span> |
| <span class="k">new</span> <span class="nf">MyStatefulWindowedBolt</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withWindow</span><span class="o">(...)</span> <span class="c1">// windowing configuarations</span> |
| <span class="o">.</span><span class="na">withMessageIdField</span><span class="o">(</span><span class="s">"msgid"</span><span class="o">),</span> <span class="c1">// a monotonically increasing 'long' field in the tuple</span> |
| <span class="n">parallelism</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span> |
| </code></pre></div> |
| <p>However, this option is feasible only if the sources can provide a monotonically increasing identifier in the tuple and the same is maintained |
| while re-emitting the messages in case of failures. With this option the tuples are still buffered in memory until processed |
| and expired from the window.</p> |
| |
| <p>For more details take a look at the sample topology in storm-starter <a href="../examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java">StatefulWindowingTopology</a> which will help you get started.</p> |
| |
| <h3 id="window-checkpointing">Window checkpointing</h3> |
| |
| <p>With window checkpointing, the monotonically increasing id is no longer required since the framework transparently saves the state of the window periodically into the configured state backend. |
| The state that is saved includes the tuples in the window, any system state that is required to recover the state of processing |
| and also the user state.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">topologyBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"mybolt"</span><span class="o">,</span> |
| <span class="k">new</span> <span class="nf">MyStatefulPersistentWindowedBolt</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withWindow</span><span class="o">(...)</span> <span class="c1">// windowing configuarations</span> |
| <span class="o">.</span><span class="na">withPersistence</span><span class="o">()</span> <span class="c1">// persist the window state</span> |
| <span class="o">.</span><span class="na">withMaxEventsInMemory</span><span class="o">(</span><span class="mi">25000</span><span class="o">),</span> <span class="c1">// max number of events to be cached in memory</span> |
| <span class="n">parallelism</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span> |
| |
| </code></pre></div> |
| <p>The <code>withPersistence</code> instructs the framework to transparently save the tuples in window along with |
| any associated system and user state to the state backend. The <code>withMaxEventsInMemory</code> is an optional |
| configuration that specifies the maximum number of tuples that may be kept in memory. The tuples are transparently loaded from |
| the state backend as required and the ones that are most likely to be used again are retained in memory.</p> |
| |
| <p>The state backend can be configured by setting the topology state provider config,</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// use redis for state persistence</span> |
| <span class="n">conf</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">Config</span><span class="o">.</span><span class="na">TOPOLOGY_STATE_PROVIDER</span><span class="o">,</span> <span class="s">"org.apache.storm.redis.state.RedisKeyValueStateProvider"</span><span class="o">);</span> |
| |
| </code></pre></div> |
| <p>Currently storm supports Redis and HBase as state backends and uses the underlying state-checkpointing |
| framework for saving the window state. For more details on state checkpointing see <a href="State-checkpointing.html">State-checkpointing</a>.</p> |
| |
| <p>Here is an example of a persistent windowed bolt that uses the window checkpointing to save its state. The <code>initState</code> |
| is invoked with the last saved state (user state) at initialization time. The execute method is invoked based on the configured |
| windowing parameters and the tuples in the active window can be accessed via an <code>iterator</code> as shown below.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyStatefulPersistentWindowedBolt</span> <span class="kd">extends</span> <span class="n">BaseStatefulWindowedBolt</span><span class="o"><</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">></span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="n">KeyValueState</span><span class="o"><</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">></span> <span class="n">state</span><span class="o">;</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">initState</span><span class="o">(</span><span class="n">KeyValueState</span><span class="o"><</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">></span> <span class="n">state</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">state</span> <span class="o">=</span> <span class="n">state</span><span class="o">;</span> |
| <span class="c1">// ...</span> |
| <span class="c1">// restore the state from the last saved state.</span> |
| <span class="c1">// ...</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TupleWindow</span> <span class="n">window</span><span class="o">)</span> <span class="o">{</span> |
| <span class="c1">// iterate over tuples in the current window</span> |
| <span class="n">Iterator</span><span class="o"><</span><span class="n">Tuple</span><span class="o">></span> <span class="n">it</span> <span class="o">=</span> <span class="n">window</span><span class="o">.</span><span class="na">getIter</span><span class="o">();</span> |
| <span class="k">while</span> <span class="o">(</span><span class="n">it</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span> |
| <span class="c1">// compute some result based on the tuples in window</span> |
| <span class="o">}</span> |
| |
| <span class="c1">// possibly update any state to be maintained across windows</span> |
| <span class="n">state</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">STATE_KEY</span><span class="o">,</span> <span class="n">updatedValue</span><span class="o">);</span> |
| |
| <span class="c1">// emit the results downstream</span> |
| <span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">result</span><span class="o">));</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p><strong>Note:</strong> In case of persistent windowed bolts, use <code>TupleWindow.getIter</code> to retrieve an iterator over the |
| events in the window. If the number of tuples in windows is huge, invoking <code>TupleWindow.get</code> would |
| try to load all the tuples into memory and may throw an OOM exception.</p> |
| |
| <p><strong>Note:</strong> In case of persistent windowed bolts the <code>TupleWindow.getNew</code> and <code>TupleWindow.getExpired</code> are currently not supported |
| and will throw an <code>UnsupportedOperationException</code>.</p> |
| |
| <p>For more details take a look at the sample topology in storm-starter <a href="../examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java">PersistentWindowingTopology</a> |
| which will help you get started.</p> |
| </div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Apache Storm</h5> |
| <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/Rationale.html">Rationale</a></li> |
| <li><a href="/releases/current/Tutorial.html">Tutorial</a></li> |
| <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/index.html">Index</a></li> |
| <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li> |
| <li><a href="/releases/current/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2022 <a href="https://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |