| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.core.triggers; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| |
| import java.util.Objects; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.state.TimeDomain; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| |
| /** |
| * {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a |
| * lower-bound, sometimes heuristically established, on event times that have been fully processed |
| * by the pipeline. |
| * |
| * <p>For sources that provide non-heuristic watermarks (e.g. PubsubIO when using arrival times as |
| * event times), the watermark is a strict guarantee that no data with an event time earlier than |
| * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any |
| * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end |
| * of the window will be the last pane ever for that window. |
| * |
| * <p>For sources that provide heuristic watermarks (e.g. PubsubIO when using user-supplied event |
| * times), the watermark itself becomes an <i>estimate</i> that no data with an event time earlier |
| * than that watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics |
| * can often be quite accurate, but the chance of seeing late data for any given window is non-zero. |
| * Thus, if absolute correctness over time is important to your use case, you may want to consider |
| * using a trigger that accounts for late data. The default trigger, {@code |
| * Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires once when the watermark passes |
| * the end of the window and then immediately therafter when any late data arrives, is one such |
| * example. |
| * |
| * <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}. |
| * |
| * <p>Additionaly firings before or after the watermark can be requested by calling {@code |
| * AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or {@code |
| * AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}. |
| */ |
| @Experimental(Experimental.Kind.TRIGGER) |
| public class AfterWatermarkStateMachine { |
| |
| private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()"; |
| |
| // Static factory class. |
| private AfterWatermarkStateMachine() {} |
| |
| /** Creates a trigger that fires when the watermark passes the end of the window. */ |
| public static FromEndOfWindow pastEndOfWindow() { |
| return new FromEndOfWindow(); |
| } |
| |
| /** @see AfterWatermarkStateMachine */ |
| public static class AfterWatermarkEarlyAndLate extends TriggerStateMachine { |
| |
| private static final int EARLY_INDEX = 0; |
| private static final int LATE_INDEX = 1; |
| |
| private final TriggerStateMachine earlyTrigger; |
| @Nullable private final TriggerStateMachine lateTrigger; |
| |
| @SuppressWarnings("unchecked") |
| private AfterWatermarkEarlyAndLate( |
| TriggerStateMachine earlyTrigger, TriggerStateMachine lateTrigger) { |
| super( |
| lateTrigger == null |
| ? ImmutableList.of(earlyTrigger) |
| : ImmutableList.of(earlyTrigger, lateTrigger)); |
| this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null"); |
| this.lateTrigger = lateTrigger; |
| } |
| |
| public AfterWatermarkEarlyAndLate withEarlyFirings(TriggerStateMachine earlyTrigger) { |
| return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); |
| } |
| |
| public AfterWatermarkEarlyAndLate withLateFirings(TriggerStateMachine lateTrigger) { |
| return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); |
| } |
| |
| @Override |
| public void onElement(OnElementContext c) throws Exception { |
| if (!endOfWindowReached(c)) { |
| c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); |
| } |
| |
| if (!c.trigger().isMerging()) { |
| // If merges can never happen, we just run the unfinished subtrigger |
| c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); |
| } else { |
| // If merges can happen, we run for all subtriggers because they might be |
| // de-activated or re-activated |
| for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { |
| subTrigger.invokeOnElement(c); |
| } |
| } |
| } |
| |
| @Override |
| public void onMerge(OnMergeContext c) throws Exception { |
| // NOTE that the ReduceFnRunner will delete all end-of-window timers for the |
| // merged-away windows. |
| |
| ExecutableTriggerStateMachine earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX); |
| // We check the early trigger to determine if we are still processing it or |
| // if the end of window has transitioned us to the late trigger |
| OnMergeContext earlyContext = c.forTrigger(earlySubtrigger); |
| |
| // If the early trigger is still active in any merging window then it is still active in |
| // the new merged window, because even if the merged window is "done" some pending elements |
| // haven't had a chance to fire. |
| if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) { |
| earlySubtrigger.invokeOnMerge(earlyContext); |
| earlyContext.trigger().setFinished(false); |
| if (lateTrigger != null) { |
| ExecutableTriggerStateMachine lateSubtrigger = c.trigger().subTrigger(LATE_INDEX); |
| OnMergeContext lateContext = c.forTrigger(lateSubtrigger); |
| lateContext.trigger().setFinished(false); |
| lateSubtrigger.invokeClear(lateContext); |
| } |
| } else { |
| // Otherwise the early trigger and end-of-window bit is done for good. |
| earlyContext.trigger().setFinished(true); |
| if (lateTrigger != null) { |
| c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c); |
| } |
| } |
| } |
| |
| private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) { |
| return context.currentEventTime() != null |
| && context.currentEventTime().isAfter(context.window().maxTimestamp()); |
| } |
| |
| @Override |
| public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { |
| if (!context.trigger().isFinished(EARLY_INDEX)) { |
| // We have not yet transitioned to late firings. |
| // We should fire if either the trigger is ready or we reach the end of the window. |
| return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context) |
| || endOfWindowReached(context); |
| } else if (lateTrigger == null) { |
| return false; |
| } else { |
| // We are running the late trigger |
| return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context); |
| } |
| } |
| |
| @Override |
| public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { |
| if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) { |
| onNonLateFiring(context); |
| } else if (lateTrigger != null) { |
| onLateFiring(context); |
| } else { |
| // all done |
| context.trigger().setFinished(true); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder builder = new StringBuilder(TO_STRING); |
| |
| if (!(earlyTrigger instanceof NeverStateMachine)) { |
| builder.append(".withEarlyFirings(").append(earlyTrigger).append(")"); |
| } |
| |
| if (lateTrigger != null && !(lateTrigger instanceof NeverStateMachine)) { |
| builder.append(".withLateFirings(").append(lateTrigger).append(")"); |
| } |
| |
| return builder.toString(); |
| } |
| |
| private void onNonLateFiring(TriggerStateMachine.TriggerContext context) throws Exception { |
| // We have not yet transitioned to late firings. |
| ExecutableTriggerStateMachine earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX); |
| TriggerStateMachine.TriggerContext earlyContext = context.forTrigger(earlySubtrigger); |
| |
| if (!endOfWindowReached(context)) { |
| // This is an early firing, since we have not arrived at the end of the window |
| // Implicitly repeats |
| earlySubtrigger.invokeOnFire(context); |
| earlySubtrigger.invokeClear(context); |
| earlyContext.trigger().setFinished(false); |
| } else { |
| // We have arrived at the end of the window; terminate the early trigger |
| // and clear out the late trigger's state |
| if (earlySubtrigger.invokeShouldFire(context)) { |
| earlySubtrigger.invokeOnFire(context); |
| } |
| earlyContext.trigger().setFinished(true); |
| earlySubtrigger.invokeClear(context); |
| |
| if (lateTrigger == null) { |
| // Done if there is no late trigger. |
| context.trigger().setFinished(true); |
| } else { |
| // If there is a late trigger, we transition to it, and need to clear its state |
| // because it was run in parallel. |
| context.trigger().subTrigger(LATE_INDEX).invokeClear(context); |
| } |
| } |
| } |
| |
| private void onLateFiring(TriggerStateMachine.TriggerContext context) throws Exception { |
| // We are firing the late trigger, with implicit repeat |
| ExecutableTriggerStateMachine lateSubtrigger = context.trigger().subTrigger(LATE_INDEX); |
| lateSubtrigger.invokeOnFire(context); |
| // It is a OnceTrigger, so it must have finished; unfinished it and clear it |
| lateSubtrigger.invokeClear(context); |
| context.forTrigger(lateSubtrigger).trigger().setFinished(false); |
| } |
| } |
| |
| /** A watermark trigger targeted relative to the end of the window. */ |
| public static class FromEndOfWindow extends TriggerStateMachine { |
| |
| private FromEndOfWindow() { |
| super(null); |
| } |
| |
| /** |
| * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever the |
| * given {@code Trigger} fires before the watermark has passed the end of the window. |
| */ |
| public AfterWatermarkEarlyAndLate withEarlyFirings(TriggerStateMachine earlyFirings) { |
| checkNotNull(earlyFirings, "Must specify the trigger to use for early firings"); |
| return new AfterWatermarkEarlyAndLate(earlyFirings, null); |
| } |
| |
| /** |
| * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever the |
| * given {@code Trigger} fires after the watermark has passed the end of the window. |
| */ |
| public AfterWatermarkEarlyAndLate withLateFirings(TriggerStateMachine lateFirings) { |
| checkNotNull(lateFirings, "Must specify the trigger to use for late firings"); |
| return new AfterWatermarkEarlyAndLate(NeverStateMachine.ever(), lateFirings); |
| } |
| |
| @Override |
| public void onElement(OnElementContext c) throws Exception { |
| // We're interested in knowing when the input watermark passes the end of the window. |
| // (It is possible this has already happened, in which case the timer will be fired |
| // almost immediately). |
| if (!endOfWindowReached(c)) { |
| c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); |
| } |
| } |
| |
| @Override |
| public void onMerge(OnMergeContext c) throws Exception { |
| // NOTE that the ReduceFnRunner will delete all end-of-window timers for the |
| // merged-away windows. |
| |
| if (!c.trigger().finishedInAllMergingWindows()) { |
| // If the trigger is still active in any merging window then it is still active in the new |
| // merged window, because even if the merged window is "done" some pending elements haven't |
| // had a chance to fire |
| c.trigger().setFinished(false); |
| } else if (!endOfWindowReached(c)) { |
| // If the end of the new window has not been reached, then the trigger is active again. |
| c.trigger().setFinished(false); |
| } else { |
| // Otherwise it is done for good |
| c.trigger().setFinished(true); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return TO_STRING; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| return obj instanceof FromEndOfWindow; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(getClass()); |
| } |
| |
| @Override |
| public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { |
| return endOfWindowReached(context); |
| } |
| |
| private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) { |
| return context.currentEventTime() != null |
| && context.currentEventTime().isAfter(context.window().maxTimestamp()); |
| } |
| |
| @Override |
| public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { |
| context.trigger().setFinished(true); |
| } |
| } |
| } |