blob: 2be41deb7721fb0fb1e0012fac24e1c4e58629a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.windowing;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
/**
* {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a
* lower-bound, sometimes heuristically established, on event times that have been fully processed
* by the pipeline.
*
* <p>For sources that provide non-heuristic watermarks (e.g. PubsubIO when using arrival times as
* event times), the watermark is a strict guarantee that no data with an event time earlier than
* that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any
* pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end
* of the window will be the last pane ever for that window.
*
* <p>For sources that provide heuristic watermarks (e.g. PubsubIO when using user-supplied event
* times), the watermark itself becomes an <i>estimate</i> that no data with an event time earlier
* than that watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics
* can often be quite accurate, but the chance of seeing late data for any given window is non-zero.
* Thus, if absolute correctness over time is important to your use case, you may want to consider
* using a trigger that accounts for late data. The default trigger, {@code
* Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires once when the watermark passes
* the end of the window and then immediately therafter when any late data arrives, is one such
* example.
*
* <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}.
*
* <p>Additionaly firings before or after the watermark can be requested by calling {@code
* AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or {@code
* AfterWatermark.pastEndOfWindow.withLateFirings(OnceTrigger)}.
*/
@Experimental(Experimental.Kind.TRIGGER)
public class AfterWatermark {
private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()";
// Static factory class.
private AfterWatermark() {}
/** Creates a trigger that fires when the watermark passes the end of the window. */
public static FromEndOfWindow pastEndOfWindow() {
return new FromEndOfWindow();
}
/** @see AfterWatermark */
public static class AfterWatermarkEarlyAndLate extends Trigger {
private final OnceTrigger earlyTrigger;
@Nullable private final OnceTrigger lateTrigger;
public OnceTrigger getEarlyTrigger() {
return earlyTrigger;
}
public OnceTrigger getLateTrigger() {
return lateTrigger;
}
@SuppressWarnings("unchecked")
private AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) {
super(
lateTrigger == null
? ImmutableList.of(earlyTrigger)
: ImmutableList.of(earlyTrigger, lateTrigger));
this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null");
this.lateTrigger = lateTrigger;
}
public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyTrigger) {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
}
public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateTrigger) {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
}
@Override
public Trigger getContinuationTrigger() {
return new AfterWatermarkEarlyAndLate(
earlyTrigger.getContinuationTrigger(),
lateTrigger == null ? null : lateTrigger.getContinuationTrigger());
}
@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
throw new UnsupportedOperationException(
"Should not call getContinuationTrigger(List<Trigger>)");
}
@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// Even without an early or late trigger, we'll still produce a firing at the watermark.
return window.maxTimestamp();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(TO_STRING);
if (!(earlyTrigger instanceof Never.NeverTrigger)) {
builder.append(".withEarlyFirings(").append(earlyTrigger).append(")");
}
if (lateTrigger != null && !(lateTrigger instanceof Never.NeverTrigger)) {
builder.append(".withLateFirings(").append(lateTrigger).append(")");
}
return builder.toString();
}
}
/** A watermark trigger targeted relative to the end of the window. */
public static class FromEndOfWindow extends OnceTrigger {
private FromEndOfWindow() {
super(null);
}
/**
* Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever the
* given {@code Trigger} fires before the watermark has passed the end of the window.
*/
public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyFirings) {
checkNotNull(earlyFirings, "Must specify the trigger to use for early firings");
return new AfterWatermarkEarlyAndLate(earlyFirings, null);
}
/**
* Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever the
* given {@code Trigger} fires after the watermark has passed the end of the window.
*/
public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) {
checkNotNull(lateFirings, "Must specify the trigger to use for late firings");
return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings);
}
@Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return window.maxTimestamp();
}
@Override
protected FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers) {
return this;
}
@Override
public String toString() {
return TO_STRING;
}
@Override
public boolean equals(Object obj) {
return obj instanceof FromEndOfWindow;
}
@Override
public int hashCode() {
return Objects.hash(getClass());
}
}
}