title: “Generating Timestamps / Watermarks”

sub-nav-group: streaming sub-nav-pos: 1 sub-nav-parent: eventtime

  • toc {:toc}

This section is relevant for program running on Event Time. For an introduction to Event Time, Processing Time, and Ingestion Time, please refer to the [event time introduction]({{ site.baseurl }}/apis/streaming/event_time.html)

To work with Event Time, streaming programs need to set the time characteristic accordingly.

Assigning Timestamps

In order to work with Event Time, Flink needs to know the events' timestamps, meaning each element in the stream needs to get its event timestamp assigned. That happens usually by accessing/extracting the timestamp from some field in the element.

Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about the progress in event time.

There are two ways to assign timestamps and generate Watermarks:

  1. Directly in the data stream source
  2. Via a TimestampAssigner / WatermarkGenerator

Source Functions with Timestamps and Watermarks

Stream sources can also directly assign timestamps to the elements they produce and emit Watermarks. In that case, no Timestamp Assigner is needed.

To assign a timestamp to an element in the soruce directly, the source must use the collectWithTimestamp(...) method on the SourceContext. To generate Watermarks, the source must call the emitWatermark(Watermark) function.

Below is a simple example of a source (non-checkpointed) that assigns timestamps and generates Watermarks depending on special events:

	if (next.hasWatermarkTime()) {
		ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
	}
}

} {% endhighlight %}

	if (next.hasWatermarkTime) {
		ctx.emitWatermark(new Watermark(next.getWatermarkTime))
	}
}

} {% endhighlight %}

Note: If the streaming program uses a TimestampAssigner on a stream where elements have a timestamp already, those timestamps will be overwritten by the TimestampAssigner. Similarly, Watermarks will be overwritten as well.

Timestamp Assigners / Watermark Generators

Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the original stream had timestamps or watermarks already, the timestamp assigner overwrites those.

The timestamp assigners occur usually direct after the data source, but it is not strictly required to. A common pattern is for example to parse (MapFunction) and filter (FilterFunction) before the timestamp assigner. In any case, the timestamp assigner needs to occur before the first operation on event time (such as the first window operation).

DataStream stream = env.addSource(new FlinkKafkaConsumer09(topic, schema, props));

DataStream withTimestampsAndWatermarks = stream .filter( event -> event.severity() == WARNING ) .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks .keyBy( (event) -> event.getGroup() ) .timeWindow(Time.seconds(10)) .reduce( (a, b) -> a.add(b) ) .addSink(...); {% endhighlight %}

val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .filter( _.severity == WARNING ) .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks .keyBy( _.getGroup ) .timeWindow(Time.seconds(10)) .reduce( (a, b) => a.add(b) ) .addSink(...) {% endhighlight %}

With Ascending timestamps

The simplest case for generating watermarks is the case where timestamps within one source occur in ascending order. In that case, the current timestamp can always act as a watermark, because no lower timestamps will occur any more.

Note that it is only necessary that timestamps are ascending per parallel data source instance. For example, if in a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary that timestamps are ascending within each Kafka partition. Flink's Watermark merging mechanism will generate correct whenever parallel streams are shuffled, unioned, connected, or merged.

DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {

    @Override
    public long extractAscendingTimestamp(MyEvent element) {
        return element.getCreationTime();
    }

}); {% endhighlight %}

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime ) {% endhighlight %}

Note: The generation of watermarks on ascending timestamps is a special case of the periodic watermark generation described in the next section.

With Periodic Watermarks

The AssignerWithPeriodicWatermarks assigns timestamps and generate watermarks periodically (possibly depending the stream elements, or purely based on processing time).

The interval (every n milliseconds) in which the watermark will be generated is defined via ExecutionConfig.setAutoWatermarkInterval(...). Each time, the assigner's getCurrentWatermark() method will be called, and a new Watermark will be emitted, if the returned Watermark is non-null and larger than the previous Watermark.

Two simple examples of timestamp assigners with periodic watermark generation are below.

private final long maxOutOfOrderness = 3500; // 3.5 seconds

private long currentMaxTimestamp;

@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
    long timestamp = element.getCreationTime(); 
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
    return timestamp;
}

@Override
public Watermark getCurrentWatermark() {
    // return the watermark as current highest timestamp minus the out-of-orderness bound
    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}

}

/**

  • This generator generates watermarks that are lagging behind processing time by a certain amount.

  • It assumes that elements arrive in Flink after at most a certain time. */ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { return element.getCreationTime(); }

    @Override public Watermark getCurrentWatermark() { // return the watermark as current time minus the maximum time lag return new Watermark(System.currentTimeMillis() - maxTimeLag); } } {% endhighlight %}

val maxOutOfOrderness = 3500L; // 3.5 seconds

var currentMaxTimestamp: Long;

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
    val timestamp = element.getCreationTime() 
    currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
    timestamp;
}

override def getCurrentWatermark(): Watermark = {
    // return the watermark as current highest timestamp minus the out-of-orderness bound
    new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}

}

/**

  • This generator generates watermarks that are lagging behind processing time by a certain amount.

  • It assumes that elements arrive in Flink after at most a certain time. */ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L; // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { element.getCreationTime }

    override def getCurrentWatermark(): Watermark = { // return the watermark as current time minus the maximum time lag new Watermark(System.currentTimeMillis() - maxTimeLag) } } {% endhighlight %}

With Punctuated Watermarks

To generate Watermarks whenever a certain event indicates that a new watermark can be generated, use the AssignerWithPunctuatedWatermarks. For this class, Flink will first call the extractTimestamp(...) method to assign the element a timestamp, and then immediately call for that element the checkAndGetNextWatermark(...) method.

The checkAndGetNextWatermark(...) method gets the timestamp that was assigned in the extractTimestamp(...) method, and can decide whether it wants to generate a Watermark. Whenever the checkAndGetNextWatermark(...) method returns a non-null Watermark, and that Watermark is larger than the latest previous Watermark, that new Watermark will be emitted.

@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
	return element.getCreationTime();
}

@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
	return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}

} {% endhighlight %}

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
	element.getCreationTime
}

override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
	if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}

} {% endhighlight %}

Note: It is possible to generate a watermark on every single event. However, because each watermark causes some computation downstream, an excessive number of watermarks slows down performance.