blob: cc4dfe4064b832baa3cf0827a9c4b76c07ab4cfa [file] [log] [blame] [view]
---
title: Windowing Support in Core Storm
layout: documentation
documentation: true
---
Storm core has support for processing a group of tuples that falls within a window. Windows are specified with the
following two parameters,
1. Window length - the length or duration of the window
2. Sliding interval - the interval at which the windowing slides
## Sliding Window
Tuples are grouped in windows and window slides every sliding interval. A tuple can belong to more than one window.
For example a time duration based sliding window with length 10 secs and sliding interval of 5 seconds.
```
........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
-5 0 5 10 15 -> time
|<------- w1 -->|
|<---------- w2 ----->|
|<-------------- w3 ---->|
```
The window is evaluated every 5 seconds and some of the tuples in the first window overlaps with the second one.
Note: The window first slides at t = 5 secs and would contain events received up to the first five secs.
## Tumbling Window
Tuples are grouped in a single window based on time or count. Any tuple belongs to only one of the windows.
For example a time duration based tumbling window with length 5 secs.
```
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
w1 w2 w3
```
The window is evaluated every five seconds and none of the windows overlap.
Storm supports specifying the window length and sliding intervals as a count of the number of tuples or as a time duration.
The bolt interface `IWindowedBolt` is implemented by bolts that needs windowing support.
```java
public interface IWindowedBolt extends IComponent {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Process tuples falling within the window and optionally emit
* new tuples based on the tuples in the input window.
*/
void execute(TupleWindow inputWindow);
void cleanup();
}
```
Every time the window activates, the `execute` method is invoked. The TupleWindow parameter gives access to the current tuples
in the window, the tuples that expired and the new tuples that are added since last window was computed which will be useful
for efficient windowing computations.
Bolts that needs windowing support typically would extend `BaseWindowedBolt` which has the apis for specifying the
window length and sliding intervals.
E.g.
```java
public class SlidingWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
for(Tuple tuple: inputWindow.get()) {
// do the windowing computation
...
}
// emit the results
collector.emit(new Values(computedValue));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("slidingwindowbolt",
new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
1).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
```
The following window configurations are supported.
```java
withWindow(Count windowLength, Count slidingInterval)
Tuple count based sliding window that slides after `slidingInterval` number of tuples.
withWindow(Count windowLength)
Tuple count based window that slides with every incoming tuple.
withWindow(Count windowLength, Duration slidingInterval)
Tuple count based sliding window that slides after `slidingInterval` time duration.
withWindow(Duration windowLength, Duration slidingInterval)
Time duration based sliding window that slides after `slidingInterval` time duration.
withWindow(Duration windowLength)
Time duration based window that slides with every incoming tuple.
withWindow(Duration windowLength, Count slidingInterval)
Time duration based sliding window configuration that slides after `slidingInterval` number of tuples.
withTumblingWindow(BaseWindowedBolt.Count count)
Count based tumbling window that tumbles after the specified count of tuples.
withTumblingWindow(BaseWindowedBolt.Duration duration)
Time duration based tumbling window that tumbles after the specified time duration.
```
## Tuple timestamp and out of order tuples
By default the timestamp tracked in the window is the time when the tuple is processed by the bolt. The window calculations
are performed based on the processing timestamp. Storm has support for tracking windows based on the source generated timestamp.
```java
/**
* Specify a field in the tuple that represents the timestamp as a long value. If this
* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
public BaseWindowedBolt withTimestampField(String fieldName)
```
The value for the above `fieldName` will be looked up from the incoming tuple and considered for windowing calculations.
If the field is not present in the tuple an exception will be thrown. Alternatively a [TimestampExtractor](../storm-core/src/jvm/org/apache/storm/windowing/TimestampExtractor.java) can be used to
derive a timestamp value from a tuple (e.g. extract timestamp from a nested field within the tuple).
```java
/**
* Specify the timestamp extractor implementation.
*
* @param timestampExtractor the {@link TimestampExtractor} implementation
*/
public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor)
```
Along with the timestamp field name/extractor, a time lag parameter can also be specified which indicates the max time limit for tuples with out of order timestamps.
```java
/**
* Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
* cannot be out of order by more than this amount.
*
* @param duration the max lag duration
*/
public BaseWindowedBolt withLag(Duration duration)
```
E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple. Late tuples are not processed by default,
just logged in the worker log files at INFO level.
```java
/**
* Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the
* {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field.
* It must be defined on a per-component basis, and in conjunction with the
* {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
*
* @param streamId the name of the stream used to emit late tuples on
*/
public BaseWindowedBolt withLateTupleStream(String streamId)
```
This behaviour can be changed by specifying the above `streamId`. In this case late tuples are going to be emitted on the specified stream and accessible
via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`.
### Watermarks
For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is
the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept
used by Flink and Google's MillWheel for tracking event based timestamps.
Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if
tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.
```java
/**
* Specify the watermark event generation interval. For tuple based timestamps, watermark events
* are used to track the progress of time
*
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval)
```
When a watermark is received, all windows up to that timestamp will be evaluated.
For example, consider tuple timestamp based processing with following window parameters,
`Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s`
```
|-----|-----|-----|-----|-----|-----|-----|
0 10 20 30 40 50 60 70
````
Current ts = `09:00:00`
Tuples `e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)` are received between `9:00:00` and `9:00:01`
At time t = `09:00:01`, watermark w1 = `6:00:31` is emitted since no tuples earlier than `6:00:31` can arrive.
Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03)
and computing the ceiling based on the sliding interval (10s).
1. `5:59:50 - 06:00:10` with tuples e1, e2, e3
2. `6:00:00 - 06:00:20` with tuples e1, e2, e3, e4
3. `6:00:10 - 06:00:30` with tuples e4, e5
e6 is not evaluated since watermark timestamp `6:00:31` is older than the tuple ts `6:00:36`.
Tuples `e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)` are received between `9:00:01` and `9:00:02`
At time t = `09:00:02` another watermark w2 = `08:00:34` is emitted since no tuples earlier than `8:00:34` can arrive now.
Three windows will be evaluated,
1. `6:00:20 - 06:00:40` with tuples e5, e6 (from earlier batch)
2. `6:00:30 - 06:00:50` with tuple e6 (from earlier batch)
3. `8:00:10 - 08:00:30` with tuples e7, e8, e9
e10 is not evaluated since the tuple ts `8:00:39` is beyond the watermark time `8:00:34`.
The window calculation considers the time gaps and computes the windows based on the tuple timestamp.
## Guarantees
The windowing functionality in storm core currently provides at-least once guarentee. The values emitted from the bolts
`execute(TupleWindow inputWindow)` method are automatically anchored to all the tuples in the inputWindow. The downstream
bolts are expected to ack the received tuple (i.e the tuple emitted from the windowed bolt) to complete the tuple tree.
If not the tuples will be replayed and the windowing computation will be re-evaluated.
The tuples in the window are automatically acked when the expire, i.e. when they fall out of the window after
`windowLength + slidingInterval`. Note that the configuration `topology.message.timeout.secs` should be sufficiently more
than `windowLength + slidingInterval` for time based windows; otherwise the tuples will timeout and get replayed and can result
in duplicate evaluations. For count based windows, the configuration should be adjusted such that `windowLength + slidingInterval`
tuples can be received within the timeout period.
## Example topology
An example toplogy `SlidingWindowTopology` shows how to use the apis to compute a sliding window sum and a tumbling window
average.