blob: 339b13b0415f44c65f42a856790cc0d049fd0720 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.beam.sdk.transforms.windowing;
import static;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.joda.time.Instant;
* {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a
* lower-bound, sometimes heuristically established, on event times that have been fully processed
* by the pipeline.
* <p>For sources that provide non-heuristic watermarks (e.g. PubsubIO when using arrival times as
* event times), the watermark is a strict guarantee that no data with an event time earlier than
* that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any
* pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end
* of the window will be the last pane ever for that window.
* <p>For sources that provide heuristic watermarks (e.g. PubsubIO when using user-supplied event
* times), the watermark itself becomes an <i>estimate</i> that no data with an event time earlier
* than that watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics
* can often be quite accurate, but the chance of seeing late data for any given window is non-zero.
* Thus, if absolute correctness over time is important to your use case, you may want to consider
* using a trigger that accounts for late data. The default trigger, {@code
* Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires once when the watermark passes
* the end of the window and then immediately therafter when any late data arrives, is one such
* example.
* <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}.
* <p>Additionaly firings before or after the watermark can be requested by calling {@code
* AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or {@code
* AfterWatermark.pastEndOfWindow.withLateFirings(OnceTrigger)}.
public class AfterWatermark {
private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()";
// Static factory class.
private AfterWatermark() {}
/** Creates a trigger that fires when the watermark passes the end of the window. */
public static FromEndOfWindow pastEndOfWindow() {
return new FromEndOfWindow();
/** @see AfterWatermark */
public static class AfterWatermarkEarlyAndLate extends Trigger {
private final OnceTrigger earlyTrigger;
@Nullable private final OnceTrigger lateTrigger;
public OnceTrigger getEarlyTrigger() {
return earlyTrigger;
public OnceTrigger getLateTrigger() {
return lateTrigger;
private AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) {
lateTrigger == null
? ImmutableList.of(earlyTrigger)
: ImmutableList.of(earlyTrigger, lateTrigger));
this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null");
this.lateTrigger = lateTrigger;
public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyTrigger) {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateTrigger) {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
public Trigger getContinuationTrigger() {
return new AfterWatermarkEarlyAndLate(
lateTrigger == null ? null : lateTrigger.getContinuationTrigger());
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
throw new UnsupportedOperationException(
"Should not call getContinuationTrigger(List<Trigger>)");
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// Even without an early or late trigger, we'll still produce a firing at the watermark.
return window.maxTimestamp();
/** @return true if there is no late firing set up, otherwise false */
public boolean mayFinish() {
return lateTrigger == null;
public String toString() {
StringBuilder builder = new StringBuilder(TO_STRING);
if (!(earlyTrigger instanceof Never.NeverTrigger)) {
if (lateTrigger != null && !(lateTrigger instanceof Never.NeverTrigger)) {
return builder.toString();
/** A watermark trigger targeted relative to the end of the window. */
public static class FromEndOfWindow extends OnceTrigger {
private FromEndOfWindow() {
* Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever the
* given {@code Trigger} fires before the watermark has passed the end of the window.
public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyFirings) {
checkNotNull(earlyFirings, "Must specify the trigger to use for early firings");
return new AfterWatermarkEarlyAndLate(earlyFirings, null);
* Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever the
* given {@code Trigger} fires after the watermark has passed the end of the window.
public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) {
checkNotNull(lateFirings, "Must specify the trigger to use for late firings");
return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings);
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return window.maxTimestamp();
protected FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers) {
return this;
public String toString() {
return TO_STRING;
public boolean equals(Object obj) {
return obj instanceof FromEndOfWindow;
public int hashCode() {
return Objects.hash(getClass());