title: “Windows”

sub-nav-id: windows sub-nav-group: streaming sub-nav-pos: 2

  • This will be replaced by the TOC {:toc}

Windows on Keyed Data Streams

Flink offers a variety of methods for defining windows on a KeyedStream. All of these group elements per key, i.e., each window will contain elements with the same key value.

Basic Window Constructs

Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows for common use cases. See first if your use case can be served by the pre-defined windows below before moving to defining your own windows.

Advanced Window Constructs

The general mechanism can define more powerful windows at the cost of more verbose syntax. For example, below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second, but the execution of the window function is triggered when 100 elements have been added to the window, and every time execution is triggered, 10 elements are retained in the window:

The general recipe for building a custom window is to specify (1) a WindowAssigner, (2) a Trigger (optionally), and (3) an Evictor (optionally).

The WindowAssigner defines how incoming elements are assigned to windows. A window is a logical group of elements that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according to some notion of time described above within these values are part of the window).

For example, the SlidingEventTimeWindows assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that time starts from 0 and is measured in milliseconds. Then, we have 6 windows that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your own window types by extending the WindowAssigner class.

        </p>
  {% highlight scala %}

stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) {% endhighlight %} Sliding processing time windows
KeyedStream → WindowedStream Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows “slide” by the provided value (1 second in the example), and hence overlap. This assigner comes with a default trigger that fires for a window a window when the current processing time exceeds its end-value. {% highlight scala %} stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))) {% endhighlight %}

The Trigger specifies when the function that comes after the window clause (e.g., sum, count) is evaluated (“fires”) for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the definition of the WindowAssigner). Flink comes bundled with a set of triggers if the ones that windows use by default do not fit the application. You can write your own trigger by implementing the Trigger interface. Note that specifying a trigger will override the default trigger of the window assigner.

After the trigger fires, and before the function (e.g., sum, count) is applied to the window contents, an optional Evictor removes some elements from the beginning of the window before the remaining elements are passed on to the function. Flink comes bundled with a set of evictors You can write your own evictor by implementing the Evictor interface.

Recipes for Building Windows

The mechanism of window assigner, trigger, and evictor is very powerful, and it allows you to define many different kinds of windows. Flink's basic window constructs are, in fact, syntactic sugar on top of the general mechanism. Below is how some common types of windows can be constructed using the general mechanism

Windows on Unkeyed Data Streams

You can also define windows on regular (non-keyed) data streams using the windowAll transformation. These windowed data streams have all the capabilities of keyed windowed data streams, but are evaluated at a single task (and hence at a single computing node). The syntax for defining triggers and evictors is exactly the same:

Basic window definitions are also available for windows on non-keyed streams: