blob: c826ffbee9509fa59f6c8fd991401302d8c600f5 [file] [log] [blame] [view]
---
title: Streaming Analytics
nav-id: analytics
nav-pos: 4
nav-title: Streaming Analytics
nav-parent_id: learn-flink
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
* This will be replaced by the TOC
{:toc}
## Event Time and Watermarks
### Introduction
Flink explicitly supports three different notions of time:
* _event time:_ the time when an event occurred, as recorded by the device producing (or storing) the event
* _ingestion time:_ a timestamp recorded by Flink at the moment it ingests the event
* _processing time:_ the time when a specific operator in your pipeline is processing the event
For reproducible results, e.g., when computing the maximum price a stock reached during the first
hour of trading on a given day, you should use event time. In this way the result won't depend on
when the calculation is performed. This kind of real-time application is sometimes performed using
processing time, but then the results are determined by the events that happen to be processed
during that hour, rather than the events that occurred then. Computing analytics based on processing
time causes inconsistencies, and makes it difficult to re-analyze historic data or test new
implementations.
### Working with Event Time
By default, Flink will use processing time. To change this, you can set the Time Characteristic:
{% highlight java %}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{% endhighlight %}
If you want to use event time, you will also need to supply a Timestamp Extractor and Watermark
Generator that Flink will use to track the progress of event time. This will be covered in the
section below on [Working with Watermarks]({% link
learn-flink/streaming_analytics.md %}#working-with-watermarks), but first we should explain what
watermarks are.
### Watermarks
Let's work through a simple example that will show why watermarks are needed, and how they work.
In this example you have a stream of timestamped events that arrive somewhat out of order, as shown
below. The numbers shown are timestamps that indicate when these events actually occurred. The first
event to arrive happened at time 4, and it is followed by an event that happened earlier, at time 2,
and so on:
<div class="text-center" style="font-size: x-large; word-spacing: 0.5em; margin: 1em 0em;">
··· 23 19 22 24 21 14 17 13 12 15 9 11 7 2 4 →
</div>
Now imagine that you are trying create a stream sorter. This is meant to be an application that
processes each event from a stream as it arrives, and emits a new stream containing the same events,
but ordered by their timestamps.
Some observations:
(1) The first element your stream sorter sees is the 4, but you can't just immediately release it as
the first element of the sorted stream. It may have arrived out of order, and an earlier event might
yet arrive. In fact, you have the benefit of some god-like knowledge of this stream's future, and
you can see that your stream sorter should wait at least until the 2 arrives before producing any
results.
*Some buffering, and some delay, is necessary.*
(2) If you do this wrong, you could end up waiting forever. First the sorter saw an event from time
4, and then an event from time 2. Will an event with a timestamp less than 2 ever arrive? Maybe.
Maybe not. You could wait forever and never see a 1.
*Eventually you have to be courageous and emit the 2 as the start of the sorted stream.*
(3) What you need then is some sort of policy that defines when, for any given timestamped event, to
stop waiting for the arrival of earlier events.
*This is precisely what watermarks do* — they define when to stop waiting for earlier events.
Event time processing in Flink depends on *watermark generators* that insert special timestamped
elements into the stream, called *watermarks*. A watermark for time _t_ is an assertion that the
stream is (probably) now complete up through time _t_.
When should this stream sorter stop waiting, and push out the 2 to start the sorted stream? When a
watermark arrives with a timestamp of 2, or greater.
(4) You might imagine different policies for deciding how to generate watermarks.
Each event arrives after some delay, and these delays vary, so some events are delayed more than
others. One simple approach is to assume that these delays are bounded by some maximum delay. Flink
refers to this strategy as *bounded-out-of-orderness* watermarking. It is easy to imagine more
complex approaches to watermarking, but for most applications a fixed delay works well enough.
### Latency vs. Completeness
Another way to think about watermarks is that they give you, the developer of a streaming
application, control over the tradeoff between latency and completeness. Unlike in batch processing,
where one has the luxury of being able to have complete knowledge of the input before producing any
results, with streaming you must eventually stop waiting to see more of the input, and produce some
sort of result.
You can either configure your watermarking aggressively, with a short bounded delay, and thereby
take the risk of producing results with rather incomplete knowledge of the input -- i.e., a possibly
wrong result, produced quickly. Or you can wait longer, and produce results that take advantage of
having more complete knowledge of the input stream(s).
It is also possible to implement hybrid solutions that produce initial results quickly, and then
supply updates to those results as additional (late) data is processed. This is a good approach for
some applications.
### Lateness
Lateness is defined relative to the watermarks. A `Watermark(t)` asserts that the stream is complete
up through time _t_; any event following this watermark whose timestamp is &le; _t_ is late.
### Working with Watermarks
In order to perform event-time-based event processing, Flink needs to know the time associated with
each event, and it also needs the stream to include watermarks.
The Taxi data sources used in the hands-on exercises take care of these details for you. But in your
own applications you will have to take care of this yourself, which is usually done by implementing
a class that extracts the timestamps from the events, and generates watermarks on demand. The
easiest way to do this is by using a `WatermarkStrategy`:
{% highlight java %}
DataStream<Event> stream = ...
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.timestamp);
DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(strategy);
{% endhighlight %}
{% top %}
## Windows
Flink features very expressive window semantics.
In this section you will learn:
* how windows are used to compute aggregates on unbounded streams,
* which types of windows Flink supports, and
* how to implement a DataStream program with a windowed aggregation
### Introduction
It is natural when doing stream processing to want to compute aggregated analytics on bounded subsets
of the streams in order to answer questions like these:
* number of page views per minute
* number of sessions per user per week
* maximum temperature per sensor per minute
Computing windowed analytics with Flink depends on two principal abstractions: _Window Assigners_
that assign events to windows (creating new window objects as necessary), and _Window Functions_
that are applied to the events assigned to a window.
Flink's windowing API also has notions of _Triggers_, which determine when to call the window
function, and _Evictors_, which can remove elements collected in a window.
In its basic form, you apply windowing to a keyed stream like this:
{% highlight java %}
stream.
.keyBy(<key selector>)
.window(<window assigner>)
.reduce|aggregate|process(<window function>)
{% endhighlight %}
You can also use windowing with non-keyed streams, but keep in mind that in this case, the
processing will _not_ be done in parallel:
{% highlight java %}
stream.
.windowAll(<window assigner>)
.reduce|aggregate|process(<window function>)
{% endhighlight %}
### Window Assigners
Flink has several built-in types of window assigners, which are illustrated below:
<img src="{{ site.baseurl }}/fig/window-assigners.svg" alt="Window assigners" class="center" width="80%" />
Some examples of what these window assigners might be used for, and how to specify them:
* Tumbling time windows
* _page views per minute_
* `TumblingEventTimeWindows.of(Time.minutes(1))`
* Sliding time windows
* _page views per minute computed every 10 seconds_
* `SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))`
* Session windows
* _page views per session, where sessions are defined by a gap of at least 30 minutes between sessions_
* `EventTimeSessionWindows.withGap(Time.minutes(30))`
Durations can be specified using one of `Time.milliseconds(n)`, `Time.seconds(n)`, `Time.minutes(n)`, `Time.hours(n)`, and `Time.days(n)`.
The time-based window assigners (including session windows) come in both event time and processing
time flavors. There are significant tradeoffs between these two types of time windows. With
processing time windowing you have to accept these limitations:
* can not correctly process historic data,
* can not correctly handle out-of-order data,
* results will be non-deterministic,
but with the advantage of lower latency.
When working with count-based windows, keep in mind that these windows will not fire until a batch
is complete. There's no option to time-out and process a partial window, though you could implement
that behavior yourself with a custom Trigger.
A global window assigner assigns every event (with the same key) to the same global window. This is
only useful if you are going to do your own custom windowing, with a custom Trigger. In many cases
where this might seem useful you will be better off using a `ProcessFunction` as described
[in another section]({% link learn-flink/event_driven.md %}#process-functions).
### Window Functions
You have three basic options for how to process the contents of your windows:
1. as a batch, using a `ProcessWindowFunction` that will be passed an `Iterable` with the window's contents;
1. incrementally, with a `ReduceFunction` or an `AggregateFunction` that is called as each event is assigned to the window;
1. or with a combination of the two, wherein the pre-aggregated results of a `ReduceFunction` or an `AggregateFunction` are supplied to a `ProcessWindowFunction` when the window is triggered.
Here are examples of approaches 1 and 3. Each implementation finds the peak value from each sensor
in 1 minute event time windows, and producing a stream of Tuples containing `(key,
end-of-window-timestamp, max_value)`.
#### ProcessWindowFunction Example
{% highlight java %}
DataStream<SensorReading> input = ...
input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new MyWastefulMax());
public static class MyWastefulMax extends ProcessWindowFunction<
SensorReading, // input type
Tuple3<String, Long, Integer>, // output type
String, // key type
TimeWindow> { // window type
@Override
public void process(
String key,
Context context,
Iterable<SensorReading> events,
Collector<Tuple3<String, Long, Integer>> out) {
int max = 0;
for (SensorReading event : events) {
max = Math.max(event.value, max);
}
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}
{% endhighlight %}
A couple of things to note in this implementation:
* All of the events assigned to the window have to be buffered in keyed Flink state until the window
is triggered. This is potentially quite expensive.
* Our `ProcessWindowFunction` is being passed a `Context` object from which contains information about
the window. Its interface looks like this:
{% highlight java %}
public abstract class Context implements java.io.Serializable {
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
}
{% endhighlight %}
`windowState` and `globalState` are places where you can store per-key, per-window, or global
per-key information for all windows of that key. This might be useful, for example, if you want to
record something about the
current window and use that when processing a subsequent window.
#### Incremental Aggregation Example
{% highlight java %}
DataStream<SensorReading> input = ...
input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new MyReducingMax(), new MyWindowFunction());
private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r1 : r2;
}
}
private static class MyWindowFunction extends ProcessWindowFunction<
SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {
@Override
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {
SensorReading max = maxReading.iterator().next();
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}
{% endhighlight %}
Notice that the `Iterable<SensorReading>`
will contain exactly one reading -- the pre-aggregated maximum computed by `MyReducingMax`.
### Late Events
By default, when using event time windows, late events are dropped. There are two optional parts of
the window API that give you more control over this.
You can arrange for the events that would be dropped to be collected to an alternate output stream
instead, using a mechanism called
[Side Outputs]({% link learn-flink/event_driven.md %}#side-outputs).
Here is an example of what that might look like:
{% highlight java %}
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};
SingleOutputStreamOperator<Event> result = stream.
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);
DataStream<Event> lateStream = result.getSideOutput(lateTag);
{% endhighlight %}
You can also specify an interval of _allowed lateness_ during which the late events will continue to
be assigned to the appropriate window(s) (whose state will have been retained). By default each late
event will cause the window function to be called again (sometimes called a _late firing_).
By default the allowed lateness is 0. In other words, elements behind the watermark are dropped (or
sent to the side output).
For example:
{% highlight java %}
stream.
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);
{% endhighlight %}
When the allowed lateness is greater than zero, only those events that are so late that they would
be dropped are sent to the side output (if it has been configured).
### Surprises
Some aspects of Flink's windowing API may not behave in the way you would expect. Based on
frequently asked questions on the [flink-user mailing
list](https://flink.apache.org/community.html#mailing-lists) and elsewhere, here are some facts
about windows that may surprise you.
#### Sliding Windows Make Copies
Sliding window assigners can create lots of window objects, and will copy each event into every
relevant window. For example, if you have sliding windows every 15 minutes that are 24-hours in
length, each event will be copied into 4 * 24 = 96 windows.
#### Time Windows are Aligned to the Epoch
Just because you are using hour-long processing-time windows and start your application running at
12:05 does not mean that the first window will close at 1:05. The first window will be 55 minutes
long and close at 1:00.
Note, however, that the tumbling and sliding window assigners take an optional offset parameter
that can be used to change the alignment of the windows. See
[Tumbling Windows]({% link dev/stream/operators/windows.md %}#tumbling-windows) and
[Sliding Windows]({% link dev/stream/operators/windows.md %}#sliding-windows) for details.
#### Windows Can Follow Windows
For example, it works to do this:
{% highlight java %}
stream
.keyBy(t -> t.key)
.timeWindow(<time specification>)
.reduce(<reduce function>)
.timeWindowAll(<same time specification>)
.reduce(<same reduce function>)
{% endhighlight %}
You might expect Flink's runtime to be smart enough to do this parallel pre-aggregation for you
(provided you are using a ReduceFunction or AggregateFunction), but it's not.
The reason why this works is that the events produced by a time window are assigned timestamps
based on the time at the end of the window. So, for example, all of the events produced
by an hour-long window will have timestamps marking the end of an hour. Any subsequent window
consuming those events should have a duration that is the same as, or a multiple of, the
previous window.
#### No Results for Empty TimeWindows
Windows are only created when events are assigned to them. So if there are no events in a given time
frame, no results will be reported.
#### Late Events Can Cause Late Merges
Session windows are based on an abstraction of windows that can _merge_. Each element is initially
assigned to a new window, after which windows are merged whenever the gap between them is small
enough. In this way, a late event can bridge the gap separating two previously separate sessions,
producing a late merge.
{% top %}
## Hands-on
The hands-on exercise that goes with this section is the [Hourly Tips
Exercise](https://github.com/apache/flink-training/tree/{% if site.is_stable %}release-{{ site.version_title }}{% else %}master{% endif %}/hourly-tips).
{% top %}
## Further Reading
- [Timely Stream Processing]({% link concepts/timely-stream-processing.md %})
- [Windows]({% link dev/stream/operators/windows.md %})
{% top %}