blob: 637a439e3b7b217f41663822c78b2e17f63574e1 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2015/12/04/introducing-stream-windows-in-apache-flink/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="The data analysis space is witnessing an evolution from batch to stream processing for many use cases. Although batch can be handled as a special case of stream processing, analyzing never-ending streaming data often requires a shift in the mindset and comes with its own terminology (for example, “windowing” and “at-least-once”/”exactly-once” processing). This shift and the new terminology can be quite confusing for people being new to the space of stream processing.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Introducing Stream Windows in Apache Flink" />
<meta property="og:description" content="The data analysis space is witnessing an evolution from batch to stream processing for many use cases. Although batch can be handled as a special case of stream processing, analyzing never-ending streaming data often requires a shift in the mindset and comes with its own terminology (for example, “windowing” and “at-least-once”/”exactly-once” processing). This shift and the new terminology can be quite confusing for people being new to the space of stream processing." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2015/12/04/introducing-stream-windows-in-apache-flink/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2015-12-04T10:00:00+00:00" />
<meta property="article:modified_time" content="2015-12-04T10:00:00+00:00" />
<title>Introducing Stream Windows in Apache Flink | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2015/12/04/introducing-stream-windows-in-apache-flink/">Introducing Stream Windows in Apache Flink</a>
</h1>
December 4, 2015 -
<p><p>The data analysis space is witnessing an evolution from batch to stream processing for many use cases. Although batch can be handled as a special case of stream processing, analyzing never-ending streaming data often requires a shift in the mindset and comes with its own terminology (for example, “windowing” and “at-least-once”/”exactly-once” processing). This shift and the new terminology can be quite confusing for people being new to the space of stream processing. Apache Flink is a production-ready stream processor with an easy-to-use yet very expressive API to define advanced stream analysis programs. Flink&rsquo;s API features very flexible window definitions on data streams which let it stand out among other open source stream processors.</p>
<p>In this blog post, we discuss the concept of windows for stream processing, present Flink&rsquo;s built-in windows, and explain its support for custom windowing semantics.</p>
<h2 id="what-are-windows-and-what-are-they-good-for">
What are windows and what are they good for?
<a class="anchor" href="#what-are-windows-and-what-are-they-good-for">#</a>
</h2>
<p>Consider the example of a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location. The resulting stream could look like:</p>
<center>
<img src="/img/blog/window-intro/window-stream.png" style="width:75%;margin:15px">
</center>
<p>If you would like to know, how many vehicles passed that location, you would simply sum the individual counts. However, the nature of a sensor stream is that it continuously produces data. Such a stream never ends and it is not possible to compute a final sum that can be returned. Instead, it is possible to compute rolling sums, i.e., return for each input event an updated sum record. This would yield a new stream of partial sums.</p>
<center>
<img src="/img/blog/window-intro/window-rolling-sum.png" style="width:75%;margin:15px">
</center>
<p>However, a stream of partial sums might not be what we are looking for, because it constantly updates the count and even more important, some information such as variation over time is lost. Hence, we might want to rephrase our question and ask for the number of cars that pass the location every minute. This requires us to group the elements of the stream into finite sets, each set corresponding to sixty seconds. This operation is called a <em>tumbling windows</em> operation.</p>
<center>
<img src="/img/blog/window-intro/window-tumbling-window.png" style="width:75%;margin:15px">
</center>
<p>Tumbling windows discretize a stream into non-overlapping windows. For certain applications it is important that windows are not disjunct because an application might require smoothed aggregates. For example, we can compute every thirty seconds the number of cars passed in the last minute. Such windows are called <em>sliding windows</em>.</p>
<center>
<img src="/img/blog/window-intro/window-sliding-window.png" style="width:75%;margin:15px">
</center>
<p>Defining windows on a data stream as discussed before is a non-parallel operation. This is because each element of a stream must be processed by the same window operator that decides which windows the element should be added to. Windows on a full stream are called <em>AllWindows</em> in Flink. For many applications, a data stream needs to be grouped into multiple logical streams on each of which a window operator can be applied. Think for example about a stream of vehicle counts from multiple traffic sensors (instead of only one sensor as in our previous example), where each sensor monitors a different location. By grouping the stream by sensor id, we can compute windowed traffic statistics for each location in parallel. In Flink, we call such partitioned windows simply <em>Windows</em>, as they are the common case for distributed streams. The following figure shows tumbling windows that collect two elements over a stream of <code>(sensorId, count)</code> pair elements.</p>
<center>
<img src="/img/blog/window-intro/windows-keyed.png" style="width:75%;margin:15px">
</center>
<p>Generally speaking, a window defines a finite set of elements on an unbounded stream. This set can be based on time (as in our previous examples), element counts, a combination of counts and time, or some custom logic to assign elements to windows. Flink&rsquo;s DataStream API provides concise operators for the most common window operations as well as a generic windowing mechanism that allows users to define very custom windowing logic. In the following we present Flink&rsquo;s time and count windows before discussing its windowing mechanism in detail.</p>
<h2 id="time-windows">
Time Windows
<a class="anchor" href="#time-windows">#</a>
</h2>
<p>As their name suggests, time windows group stream elements by time. For example, a tumbling time window of one minute collects elements for one minute and applies a function on all elements in the window after one minute passed.</p>
<p>Defining tumbling and sliding time windows in Apache Flink is very easy:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">// Stream of (sensorId, carCnt)
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">vehicleCnts</span><span class="k">:</span> <span class="kt">DataStream</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="o">...</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">val</span> <span class="n">tumblingCnts</span><span class="k">:</span> <span class="kt">DataStream</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">vehicleCnts</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// key stream by sensorId
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">.</span><span class="n">keyBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// tumbling time window of 1 minute length
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">.</span><span class="n">timeWindow</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">minutes</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// compute sum over carCnt
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">val</span> <span class="n">slidingCnts</span><span class="k">:</span> <span class="kt">DataStream</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">vehicleCnts</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">keyBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// sliding time window of 1 minute length and 30 secs trigger interval
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">.</span><span class="n">timeWindow</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">minutes</span><span class="o">(</span><span class="mi">1</span><span class="o">),</span> <span class="nc">Time</span><span class="o">.</span><span class="n">seconds</span><span class="o">(</span><span class="mi">30</span><span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
</span></span></code></pre></div><p>There is one aspect that we haven&rsquo;t discussed yet, namely the exact meaning of &ldquo;<em>collects elements for one minute</em>&rdquo; which boils down to the question, &ldquo;<em>How does the stream processor interpret time?</em>&rdquo;.</p>
<p>Apache Flink features three different notions of time, namely <em>processing time</em>, <em>event time</em>, and <em>ingestion time</em>.</p>
<ol>
<li>In <strong>processing time</strong>, windows are defined with respect to the wall clock of the machine that builds and processes a window, i.e., a one minute processing time window collects elements for exactly one minute.</li>
<li>In <strong>event time</strong>, windows are defined with respect to timestamps that are attached to each event record. This is common for many types of events, such as log entries, sensor data, etc, where the timestamp usually represents the time at which the event occurred. Event time has several benefits over processing time. First of all, it decouples the program semantics from the actual serving speed of the source and the processing performance of system. Hence you can process historic data, which is served at maximum speed, and continuously produced data with the same program. It also prevents semantically incorrect results in case of backpressure or delays due to failure recovery. Second, event time windows compute correct results, even if events arrive out-of-order of their timestamp which is common if a data stream gathers events from distributed sources.</li>
<li><strong>Ingestion time</strong> is a hybrid of processing and event time. It assigns wall clock timestamps to records as soon as they arrive in the system (at the source) and continues processing with event time semantics based on the attached timestamps.</li>
</ol>
<h2 id="count-windows">
Count Windows
<a class="anchor" href="#count-windows">#</a>
</h2>
<p>Apache Flink also features count windows. A tumbling count window of 100 will collect 100 events in a window and evaluate the window when the 100th element has been added.</p>
<p>In Flink&rsquo;s DataStream API, tumbling and sliding count windows are defined as follows:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">// Stream of (sensorId, carCnt)
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">vehicleCnts</span><span class="k">:</span> <span class="kt">DataStream</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="o">...</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">val</span> <span class="n">tumblingCnts</span><span class="k">:</span> <span class="kt">DataStream</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">vehicleCnts</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// key stream by sensorId
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">.</span><span class="n">keyBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// tumbling count window of 100 elements size
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">.</span><span class="n">countWindow</span><span class="o">(</span><span class="mi">100</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// compute the carCnt sum
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">val</span> <span class="n">slidingCnts</span><span class="k">:</span> <span class="kt">DataStream</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">vehicleCnts</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">keyBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// sliding count window of 100 elements size and 10 elements trigger interval
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">.</span><span class="n">countWindow</span><span class="o">(</span><span class="mi">100</span><span class="o">,</span> <span class="mi">10</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
</span></span></code></pre></div><h2 id="dissecting-flinks-windowing-mechanics">
Dissecting Flink&rsquo;s windowing mechanics
<a class="anchor" href="#dissecting-flinks-windowing-mechanics">#</a>
</h2>
<p>Flink&rsquo;s built-in time and count windows cover a wide range of common window use cases. However, there are of course applications that require custom windowing logic that cannot be addressed by Flink&rsquo;s built-in windows. In order to support also applications that need very specific windowing semantics, the DataStream API exposes interfaces for the internals of its windowing mechanics. These interfaces give very fine-grained control about the way that windows are built and evaluated.</p>
<p>The following figure depicts Flink&rsquo;s windowing mechanism and introduces the components being involved.</p>
<center>
<img src="/img/blog/window-intro/window-mechanics.png" style="width:90%;margin:15px">
</center>
<p>Elements that arrive at a window operator are handed to a <code>WindowAssigner</code>. The WindowAssigner assigns elements to one or more windows, possibly creating new windows. A <code>Window</code> itself is just an identifier for a list of elements and may provide some optional meta information, such as begin and end time in case of a <code>TimeWindow</code>. Note that an element can be added to multiple windows, which also means that multiple windows can exist at the same time.</p>
<p>Each window owns a <code>Trigger</code> that decides when the window is evaluated or purged. The trigger is called for each element that is inserted into the window and when a previously registered timer times out. On each event, a trigger can decide to fire (i.e., evaluate), purge (remove the window and discard its content), or fire and then purge the window. A trigger that just fires evaluates the window and keeps it as it is, i.e., all elements remain in the window and are evaluated again when the triggers fires the next time. A window can be evaluated several times and exists until it is purged. Note that a window consumes memory until it is purged.</p>
<p>When a Trigger fires, the list of window elements can be given to an optional <code>Evictor</code>. The evictor can iterate through the list and decide to cut off some elements from the start of the list, i.e., remove some of the elements that entered the window first. The remaining elements are given to an evaluation function. If no Evictor was defined, the Trigger hands all the window elements directly to the evaluation function.</p>
<p>The evaluation function receives the elements of a window (possibly filtered by an Evictor) and computes one or more result elements for the window. The DataStream API accepts different types of evaluation functions, including predefined aggregation functions such as <code>sum()</code>, <code>min()</code>, <code>max()</code>, as well as a <code>ReduceFunction</code>, <code>FoldFunction</code>, or <code>WindowFunction</code>. A WindowFunction is the most generic evaluation function and receives the window object (i.e, the metadata of the window), the list of window elements, and the window key (in case of a keyed window) as parameters.</p>
<p>These are the components that constitute Flink&rsquo;s windowing mechanics. We now show step-by-step how to implement custom windowing logic with the DataStream API. We start with a stream of type <code>DataStream[IN]</code> and key it using a key selector function that extracts a key of type <code>KEY</code> to obtain a <code>KeyedStream[IN, KEY]</code>.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataStream</span><span class="o">[</span><span class="kt">IN</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1">// created a keyed stream using a key selector function
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">keyed</span><span class="k">:</span> <span class="kt">KeyedStream</span><span class="o">[</span><span class="kt">IN</span>, <span class="kt">KEY</span><span class="o">]</span> <span class="k">=</span> <span class="n">input</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">keyBy</span><span class="o">(</span><span class="n">myKeySel</span><span class="k">:</span> <span class="o">(</span><span class="kt">IN</span><span class="o">)</span> <span class="o">=&gt;</span> <span class="nc">KEY</span><span class="o">)</span>
</span></span></code></pre></div><p>We apply a <code>WindowAssigner[IN, WINDOW]</code> that creates windows of type <code>WINDOW</code> resulting in a <code>WindowedStream[IN, KEY, WINDOW]</code>. In addition, a <code>WindowAssigner</code> also provides a default <code>Trigger</code> implementation.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">// create windowed stream using a WindowAssigner
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">var</span> <span class="n">windowed</span><span class="k">:</span> <span class="kt">WindowedStream</span><span class="o">[</span><span class="kt">IN</span>, <span class="kt">KEY</span>, <span class="kt">WINDOW</span><span class="o">]</span> <span class="k">=</span> <span class="n">keyed</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="n">myAssigner</span><span class="k">:</span> <span class="kt">WindowAssigner</span><span class="o">[</span><span class="kt">IN</span>, <span class="kt">WINDOW</span><span class="o">])</span>
</span></span></code></pre></div><p>We can explicitly specify a <code>Trigger</code> to overwrite the default <code>Trigger</code> provided by the <code>WindowAssigner</code>. Note that specifying a triggers does not add an additional trigger condition but replaces the current trigger.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">// override the default trigger of the WindowAssigner
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="n">windowed</span> <span class="k">=</span> <span class="n">windowed</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">trigger</span><span class="o">(</span><span class="n">myTrigger</span><span class="k">:</span> <span class="kt">Trigger</span><span class="o">[</span><span class="kt">IN</span>, <span class="kt">WINDOW</span><span class="o">])</span>
</span></span></code></pre></div><p>We may want to specify an optional <code>Evictor</code> as follows.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">// specify an optional evictor
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="n">windowed</span> <span class="k">=</span> <span class="n">windowed</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">evictor</span><span class="o">(</span><span class="n">myEvictor</span><span class="k">:</span> <span class="kt">Evictor</span><span class="o">[</span><span class="kt">IN</span>, <span class="kt">WINDOW</span><span class="o">])</span>
</span></span></code></pre></div><p>Finally, we apply a <code>WindowFunction</code> that returns elements of type <code>OUT</code> to obtain a <code>DataStream[OUT]</code>.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">// apply window function to windowed stream
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">output</span><span class="k">:</span> <span class="kt">DataStream</span><span class="o">[</span><span class="kt">OUT</span><span class="o">]</span> <span class="k">=</span> <span class="n">windowed</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">apply</span><span class="o">(</span><span class="n">myWinFunc</span><span class="k">:</span> <span class="kt">WindowFunction</span><span class="o">[</span><span class="kt">IN</span>, <span class="kt">OUT</span>, <span class="kt">KEY</span>, <span class="kt">WINDOW</span><span class="o">])</span>
</span></span></code></pre></div><p>With Flink&rsquo;s internal windowing mechanics and its exposure through the DataStream API it is possible to implement very custom windowing logic such as session windows or windows that emit early results if the values exceed a certain threshold.</p>
<h2 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h2>
<p>Support for various types of windows over continuous data streams is a must-have for modern stream processors. Apache Flink is a stream processor with a very strong feature set, including a very flexible mechanism to build and evaluate windows over continuous data streams. Flink provides pre-defined window operators for common uses cases as well as a toolbox that allows to define very custom windowing logic. The Flink community will add more pre-defined window operators as we learn the requirements from our users.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2015-12-04-Introducing-windows.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#what-are-windows-and-what-are-they-good-for">What are windows and what are they good for?</a></li>
<li><a href="#time-windows">Time Windows</a></li>
<li><a href="#count-windows">Count Windows</a></li>
<li><a href="#dissecting-flinks-windowing-mechanics">Dissecting Flink&rsquo;s windowing mechanics</a></li>
<li><a href="#conclusion">Conclusion</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>