blob: 11439746d90705892c085ca0e9e6c87c0efa3787 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the
* {@link PCollection} by key.
*
* <p>The {@link #onTrigger} relies on a {@link TriggerStateMachineRunner} to manage the execution
* of the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
*
* <ul>
* <li>Tracking the windows that are active (have buffered data) as elements arrive and triggers
* are fired.
* <li>Holding the watermark based on the timestamps of elements in a pane and releasing it when
* the trigger fires.
* <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
* firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions such
* as output.
* <li>Scheduling garbage collection of state associated with a specific window, and making that
* happen when the appropriate timer fires.
* </ul>
*
* @param <K> The type of key being processed.
* @param <InputT> The type of values associated with the key.
* @param <OutputT> The output type that will be produced for each key.
* @param <W> The type of windows this operates on.
*/
public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
/**
* The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
*
* <ul>
* <li>It runs the trigger from the {@link WindowingStrategy}.
* <li>It merges windows according to the {@link WindowingStrategy}.
* <li>It chooses how to track active windows and clear out expired windows according to the
* {@link WindowingStrategy}, based on the allowed lateness and whether windows can merge.
* <li>It decides whether to emit empty final panes according to whether the {@link
* WindowingStrategy} requires it.
* <li>
* <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.
* </ul>
*/
private final WindowingStrategy<Object, W> windowingStrategy;
private final OutputWindowedValue<KV<K, OutputT>> outputter;
private final StateInternals stateInternals;
private final Counter droppedDueToClosedWindow;
public static final String DROPPED_DUE_TO_CLOSED_WINDOW = "droppedDueToClosedWindow";
private final K key;
/**
* Track which windows are still active and the 'state address' windows which hold their state.
*
* <ul>
* <li>State: Global map for all active windows for this computation and key.
* <li>Lifetime: Cleared when no active windows need to be tracked. A window lives within the
* active window set until its trigger is closed or the window is garbage collected.
* </ul>
*/
private final ActiveWindowSet<W> activeWindows;
/**
* Always a {@link SystemReduceFn}.
*
* <ul>
* <li>State: A bag of accumulated values, or the intermediate result of a combiner.
* <li>State style: RENAMED
* <li>Merging: Concatenate or otherwise combine the state from each merged window.
* <li>Lifetime: Cleared when a pane fires if DISCARDING_FIRED_PANES. Otherwise cleared when
* trigger is finished or when the window is garbage collected.
* </ul>
*/
private final ReduceFn<K, InputT, OutputT, W> reduceFn;
/**
* Manage the setting and firing of timer events.
*
* <ul>
* <li>Merging: End-of-window and garbage collection timers are cancelled when windows are
* merged away. Timers created by triggers are never garbage collected and are left to fire
* and be ignored.
* <li>Lifetime: Timers automatically disappear after they fire.
* </ul>
*/
private final TimerInternals timerInternals;
/**
* Manage the execution and state for triggers.
*
* <ul>
* <li>State: Tracks which sub-triggers have finished, and any additional state needed to
* determine when the trigger should fire.
* <li>State style: DIRECT
* <li>Merging: Finished bits are explicitly managed. Other state is eagerly merged as needed.
* <li>Lifetime: Most trigger state is cleared when the final pane is emitted. However the
* finished bits are left behind and must be cleared when the window is garbage collected.
* </ul>
*/
private final TriggerStateMachineRunner<W> triggerRunner;
/**
* Store the output watermark holds for each window.
*
* <ul>
* <li>State: Bag of hold timestamps.
* <li>State style: RENAMED
* <li>Merging: Depending on {@link TimestampCombiner}, may need to be recalculated on merging.
* When a pane fires it may be necessary to add (back) an end-of-window or garbage
* collection hold.
* <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
* </ul>
*/
private final WatermarkHold<W> watermarkHold;
private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
/**
* Store the previously emitted pane (if any) for each window.
*
* <ul>
* <li>State: The previous {@link PaneInfo} passed to the user's {@code DoFn.ProcessElement}
* method, if any.
* <li>Style style: DIRECT
* <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
* Cleared when window is merged away.
* <li>Lifetime: Cleared when trigger is closed or window is garbage collected.
* </ul>
*/
private final PaneInfoTracker paneInfoTracker;
/**
* Store whether we've seen any elements for a window since the last pane was emitted.
*
* <ul>
* <li>State: Unless DISCARDING_FIRED_PANES, a count of number of elements added so far.
* <li>State style: RENAMED.
* <li>Merging: Counts are summed when windows are merged.
* <li>Lifetime: Cleared when pane fires or window is garbage collected.
* </ul>
*/
private final NonEmptyPanes<K, W> nonEmptyPanes;
public ReduceFnRunner(
K key,
WindowingStrategy<?, W> windowingStrategy,
ExecutableTriggerStateMachine triggerStateMachine,
StateInternals stateInternals,
TimerInternals timerInternals,
OutputWindowedValue<KV<K, OutputT>> outputter,
@Nullable SideInputReader sideInputReader,
ReduceFn<K, InputT, OutputT, W> reduceFn,
@Nullable PipelineOptions options) {
this.key = key;
this.timerInternals = timerInternals;
this.paneInfoTracker = new PaneInfoTracker(timerInternals);
this.stateInternals = stateInternals;
this.outputter = outputter;
this.reduceFn = reduceFn;
this.droppedDueToClosedWindow =
Metrics.counter(ReduceFnRunner.class, DROPPED_DUE_TO_CLOSED_WINDOW);
@SuppressWarnings("unchecked")
WindowingStrategy<Object, W> objectWindowingStrategy =
(WindowingStrategy<Object, W>) windowingStrategy;
this.windowingStrategy = objectWindowingStrategy;
this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
// Note this may incur I/O to load persisted window set data.
this.activeWindows = createActiveWindowSet();
this.contextFactory =
new ReduceFnContextFactory<>(
key,
reduceFn,
this.windowingStrategy,
stateInternals,
this.activeWindows,
timerInternals,
sideInputReader,
options);
this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
this.triggerRunner =
new TriggerStateMachineRunner<>(
triggerStateMachine,
new TriggerStateMachineContextFactory<>(
windowingStrategy.getWindowFn(), stateInternals, activeWindows));
}
private ActiveWindowSet<W> createActiveWindowSet() {
return windowingStrategy.getWindowFn().isNonMerging()
? new NonMergingActiveWindowSet<>()
: new MergingActiveWindowSet<>(windowingStrategy.getWindowFn(), stateInternals);
}
@VisibleForTesting
boolean isFinished(W window) {
return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state());
}
@VisibleForTesting
boolean hasNoActiveWindows() {
return activeWindows.getActiveAndNewWindows().isEmpty();
}
private Set<W> windowsThatAreOpen(Collection<W> windows) {
Set<W> result = new HashSet<>();
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
if (!triggerRunner.isClosed(directContext.state())) {
result.add(window);
}
}
return result;
}
private Collection<W> windowsThatShouldFire(Set<W> windows) throws Exception {
Collection<W> result = new ArrayList<>();
// Filter out timers that didn't trigger.
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
if (triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
result.add(window);
}
}
return result;
}
/**
* Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
* triggers, and window merging.
*
* <p>The general strategy is:
*
* <ol>
* <li>Use {@link WindowedValue#getWindows} (itself determined using {@link
* WindowFn#assignWindows}) to determine which windows each element belongs to. Some of
* those windows will already have state associated with them. The rest are considered NEW.
* <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows.
* Each NEW window will become either ACTIVE or be discardedL. (See {@link ActiveWindowSet}
* for definitions of these terms.)
* <li>If at all possible, eagerly substitute NEW windows with their ACTIVE state address
* windows before any state is associated with the NEW window. In the common case that
* windows for new elements are merged into existing ACTIVE windows then no additional
* storage or merging overhead will be incurred.
* <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their
* states can be merged on-demand when a pane fires.
* <li>Process the element for each of the windows it's windows have been merged into according
* to {@link ActiveWindowSet}. Processing may require running triggers, setting timers,
* setting holds, and invoking {@link ReduceFn#onTrigger}.
* </ol>
*/
public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
if (!values.iterator().hasNext()) {
return;
}
// Determine all the windows for elements.
Set<W> windows = collectWindows(values);
// If an incoming element introduces a new window, attempt to merge it into an existing
// window eagerly.
Map<W, W> windowToMergeResult = mergeWindows(windows);
if (!windowToMergeResult.isEmpty()) {
// Update windows by removing all windows that were merged away and adding
// the windows they were merged to. We add after completing all the
// removals to avoid removing a window that was also added.
List<W> addedWindows = new ArrayList<>(windowToMergeResult.size());
for (Map.Entry<W, W> entry : windowToMergeResult.entrySet()) {
windows.remove(entry.getKey());
addedWindows.add(entry.getValue());
}
windows.addAll(addedWindows);
}
prefetchWindowsForValues(windows);
// All windows that are open before element processing may need to fire.
Set<W> windowsToConsider = windowsThatAreOpen(windows);
// Process each element, using the updated activeWindows determined by mergeWindows.
for (WindowedValue<InputT> value : values) {
processElement(windowToMergeResult, value);
}
// Now that we've processed the elements, see if any of the windows need to fire.
// Prefetch state necessary to determine if the triggers should fire.
for (W mergedWindow : windowsToConsider) {
triggerRunner.prefetchShouldFire(
mergedWindow, contextFactory.base(mergedWindow, StateStyle.DIRECT).state());
}
// Filter to windows that are firing.
Collection<W> windowsToFire = windowsThatShouldFire(windowsToConsider);
// Prefetch windows that are firing.
for (W window : windowsToFire) {
prefetchEmit(
contextFactory.base(window, StateStyle.DIRECT),
contextFactory.base(window, StateStyle.RENAMED));
}
// Trigger output from firing windows.
for (W window : windowsToFire) {
emit(
contextFactory.base(window, StateStyle.DIRECT),
contextFactory.base(window, StateStyle.RENAMED));
}
// We're all done with merging and emitting elements so can compress the activeWindow state.
// Any windows which are still NEW must have come in on a new element which was then discarded
// due to the window's trigger being closed. We can thus delete them.
activeWindows.cleanupTemporaryWindows();
}
public void persist() {
activeWindows.persist();
}
/** Extract the windows associated with the values. */
private Set<W> collectWindows(Iterable<WindowedValue<InputT>> values) throws Exception {
Set<W> windows = new HashSet<>();
for (WindowedValue<?> value : values) {
for (BoundedWindow untypedWindow : value.getWindows()) {
@SuppressWarnings("unchecked")
W window = (W) untypedWindow;
windows.add(window);
}
}
return windows;
}
/**
* Invoke merge for the given windows and return a map from windows to the merge result window.
* Windows that were not merged are not present in the map.
*/
private Map<W, W> mergeWindows(Set<W> windows) throws Exception {
if (windowingStrategy.getWindowFn().isNonMerging()) {
// Return an empty map, indicating that every window is not merged.
return Collections.emptyMap();
}
Map<W, W> windowToMergeResult = new HashMap<>();
// Collect the windows from all elements (except those which are too late) and
// make sure they are already in the active window set or are added as NEW windows.
for (W window : windows) {
// For backwards compat with pre 1.4 only.
// We may still have ACTIVE windows with multiple state addresses, representing
// a window who's state has not yet been eagerly merged.
// We'll go ahead and merge that state now so that we don't have to worry about
// this legacy case anywhere else.
if (activeWindows.isActive(window)) {
Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
if (stateAddressWindows.size() > 1) {
// This is a legacy window who's state has not been eagerly merged.
// Do that now.
ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
contextFactory.forPremerge(window);
reduceFn.onMerge(premergeContext);
watermarkHold.onMerge(premergeContext);
activeWindows.merged(window);
}
}
// Add this window as NEW if it is not currently ACTIVE.
// If we had already seen this window and closed its trigger, then the
// window will not be currently ACTIVE. It will then be added as NEW here,
// and fall into the merging logic as usual.
activeWindows.ensureWindowExists(window);
}
// Merge all of the active windows and retain a mapping from source windows to result windows.
activeWindows.merge(new OnMergeCallback(windowToMergeResult));
return windowToMergeResult;
}
private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
private final Map<W, W> windowToMergeResult;
OnMergeCallback(Map<W, W> windowToMergeResult) {
this.windowToMergeResult = windowToMergeResult;
}
/**
* Return the subset of {@code windows} which are currently ACTIVE. We only need to worry about
* merging state from ACTIVE windows. NEW windows by definition have no existing state.
*/
private List<W> activeWindows(Iterable<W> windows) {
List<W> active = new ArrayList<>();
for (W window : windows) {
if (activeWindows.isActive(window)) {
active.add(window);
}
}
return active;
}
/**
* Called from the active window set to indicate {@code toBeMerged} (of which only {@code
* activeToBeMerged} are ACTIVE and thus have state associated with them) will later be merged
* into {@code mergeResult}.
*/
@Override
public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
List<W> activeToBeMerged = activeWindows(toBeMerged);
ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
// Prefetch various state.
triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, directMergeContext.state());
reduceFn.prefetchOnMerge(renamedMergeContext.state());
watermarkHold.prefetchOnMerge(renamedMergeContext.state());
nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state());
}
/**
* Called from the active window set to indicate {@code toBeMerged} (of which only {@code
* activeToBeMerged} are ACTIVE and thus have state associated with them) are about to be merged
* into {@code mergeResult}.
*/
@Override
public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
// Remember we have merged these windows.
for (W window : toBeMerged) {
windowToMergeResult.put(window, mergeResult);
}
// At this point activeWindows has NOT incorporated the results of the merge.
List<W> activeToBeMerged = activeWindows(toBeMerged);
ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
// Run the reduceFn to perform any needed merging.
reduceFn.onMerge(renamedMergeContext);
// Merge the watermark holds.
watermarkHold.onMerge(renamedMergeContext);
// Merge non-empty pane state.
nonEmptyPanes.onMerge(renamedMergeContext.state());
// Have the trigger merge state as needed.
triggerRunner.onMerge(
directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
for (W active : activeToBeMerged) {
if (active.equals(mergeResult)) {
// Not merged away.
continue;
}
// Cleanup flavor A: Currently ACTIVE window is about to be merged away.
// Clear any state not already cleared by the onMerge calls above.
WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult);
ReduceFn<K, InputT, OutputT, W>.Context directClearContext =
contextFactory.base(active, StateStyle.DIRECT);
// No need for the end-of-window or garbage collection timers.
// We will establish a new end-of-window or garbage collection timer for the mergeResult
// window in processElement below. There must be at least one element for the mergeResult
// window since a new element with a new window must have triggered this onMerge.
cancelEndOfWindowAndGarbageCollectionTimers(directClearContext);
// We no longer care about any previous panes of merged away windows. The
// merge result window gets to start fresh if it is new.
paneInfoTracker.clear(directClearContext.state());
}
}
}
/**
* Redirect element windows to the ACTIVE windows they have been merged into. The compressed
* representation (value, {window1, window2, ...}) actually represents distinct elements (value,
* window1), (value, window2), ... so if window1 and window2 merge, the resulting window will
* contain both copies of the value.
*/
private ImmutableSet<W> toMergedWindows(
final Map<W, W> windowToMergeResult, final Collection<? extends BoundedWindow> windows) {
return ImmutableSet.copyOf(
FluentIterable.from(windows)
.transform(
untypedWindow -> {
@SuppressWarnings("unchecked")
W window = (W) untypedWindow;
W mergedWindow = windowToMergeResult.get(window);
// If the element is not present in the map, the window is unmerged.
return (mergedWindow == null) ? window : mergedWindow;
}));
}
private void prefetchWindowsForValues(Collection<W> windows) {
// Prefetch in each of the windows if we're going to need to process triggers
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
triggerRunner.prefetchForValue(window, directContext.state());
}
}
/**
* Process an element.
*
* @param windowToMergeResult map of windows to merged windows. If a window is not present it is
* unmerged.
* @param value the value being processed
*/
private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
throws Exception {
ImmutableSet<W> windows = toMergedWindows(windowToMergeResult, value.getWindows());
// Process the element for each (mergeResultWindow, not closed) window it belongs to.
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext =
contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
if (triggerRunner.isClosed(directContext.state())) {
// This window has already been closed.
droppedDueToClosedWindow.inc();
WindowTracing.debug(
"ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
value.getTimestamp(),
key,
window,
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
continue;
}
activeWindows.ensureWindowIsActive(window);
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext =
contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
nonEmptyPanes.recordContent(renamedContext.state());
scheduleGarbageCollectionTimer(directContext);
// Hold back progress of the output watermark until we have processed the pane this
// element will be included within. If the element is later than the output watermark, the
// hold will be at GC time.
watermarkHold.addHolds(renamedContext);
// Execute the reduceFn, which will buffer the value as appropriate
reduceFn.processValue(renamedContext);
// Run the trigger to update its state
triggerRunner.processValue(
directContext.window(),
directContext.timestamp(),
directContext.timers(),
directContext.state());
// At this point, if triggerRunner.shouldFire before the processValue then
// triggerRunner.shouldFire after the processValue. In other words adding values
// cannot take a trigger state from firing to non-firing.
// (We don't actually assert this since it is too slow.)
}
}
/** A descriptor of the activation for a window based on a timer. */
private class WindowActivation {
public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
// If this is an end-of-window timer then we may need to set a garbage collection timer
// if allowed lateness is non-zero.
public final boolean isEndOfWindow;
// If this is a garbage collection timer then we should trigger and
// garbage collect the window. We'll consider any timer at or after the
// end-of-window time to be a signal to garbage collect.
public final boolean isGarbageCollection;
WindowActivation(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
this.directContext = directContext;
this.renamedContext = renamedContext;
W window = directContext.window();
// The output watermark is before the end of the window if it is either unknown
// or it is known to be before it. If it is unknown, that means that there hasn't been
// enough data to advance it.
boolean outputWatermarkBeforeEOW =
timerInternals.currentOutputWatermarkTime() == null
|| !timerInternals.currentOutputWatermarkTime().isAfter(window.maxTimestamp());
// The "end of the window" is reached when the local input watermark (for this key) surpasses
// it but the local output watermark (also for this key) has not. After data is emitted and
// the output watermark hold is released, the output watermark on this key will immediately
// exceed the end of the window (otherwise we could see multiple ON_TIME outputs)
this.isEndOfWindow =
timerInternals.currentInputWatermarkTime().isAfter(window.maxTimestamp())
&& outputWatermarkBeforeEOW;
// The "GC time" is reached when the input watermark surpasses the end of the window
// plus allowed lateness. After this, the window is expired and expunged.
this.isGarbageCollection =
timerInternals
.currentInputWatermarkTime()
.isAfter(LateDataUtils.garbageCollectionTime(window, windowingStrategy));
}
// Has this window had its trigger finish?
// - The trigger may implement isClosed as constant false.
// - If the window function does not support windowing then all windows will be considered
// active.
// So we must take conjunction of activeWindows and triggerRunner state.
public boolean windowIsActiveAndOpen() {
return activeWindows.isActive(directContext.window())
&& !triggerRunner.isClosed(directContext.state());
}
}
public void onTimers(Iterable<TimerData> timers) throws Exception {
if (!timers.iterator().hasNext()) {
return;
}
// Create a reusable context for each window and begin prefetching necessary
// state.
Map<BoundedWindow, WindowActivation> windowActivations = new HashMap();
for (TimerData timer : timers) {
checkArgument(
timer.getNamespace() instanceof WindowNamespace,
"Expected timer to be in WindowNamespace, but was in %s",
timer.getNamespace());
@SuppressWarnings("unchecked")
WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
W window = windowNamespace.getWindow();
WindowTracing.debug(
"{}: Received timer key:{}; window:{}; data:{} with "
+ "inputWatermark:{}; outputWatermark:{}",
ReduceFnRunner.class.getSimpleName(),
key,
window,
timer,
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
// Processing time timers for an expired window are ignored, just like elements
// that show up too late. Window GC is management by an event time timer
if (TimeDomain.EVENT_TIME != timer.getDomain() && windowIsExpired(window)) {
continue;
}
// How a window is processed is a function only of the current state, not the details
// of the timer. This makes us robust to large leaps in processing time and watermark
// time, where both EOW and GC timers come in together and we need to GC and emit
// the final pane.
if (windowActivations.containsKey(window)) {
continue;
}
ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
contextFactory.base(window, StateStyle.RENAMED);
WindowActivation windowActivation = new WindowActivation(directContext, renamedContext);
windowActivations.put(window, windowActivation);
// Perform prefetching of state to determine if the trigger should fire.
if (windowActivation.isGarbageCollection) {
triggerRunner.prefetchIsClosed(directContext.state());
} else {
triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
}
}
// For those windows that are active and open, prefetch the triggering or emitting state.
for (WindowActivation timer : windowActivations.values()) {
if (timer.windowIsActiveAndOpen()) {
ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
if (timer.isGarbageCollection) {
prefetchOnTrigger(directContext, timer.renamedContext);
} else if (triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
prefetchEmit(directContext, timer.renamedContext);
}
}
}
// Perform processing now that everything is prefetched.
for (WindowActivation windowActivation : windowActivations.values()) {
ReduceFn<K, InputT, OutputT, W>.Context directContext = windowActivation.directContext;
ReduceFn<K, InputT, OutputT, W>.Context renamedContext = windowActivation.renamedContext;
if (windowActivation.isGarbageCollection) {
WindowTracing.debug(
"{}: Cleaning up for key:{}; window:{} with inputWatermark:{}; outputWatermark:{}",
ReduceFnRunner.class.getSimpleName(),
key,
directContext.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
boolean windowIsActiveAndOpen = windowActivation.windowIsActiveAndOpen();
if (windowIsActiveAndOpen) {
// We need to call onTrigger to emit the final pane if required.
// The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
// and the watermark has passed the end of the window.
@Nullable
Instant newHold =
onTrigger(
directContext,
renamedContext,
true /* isFinished */,
windowActivation.isEndOfWindow);
checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
}
// Cleanup flavor B: Clear all the remaining state for this window since we'll never
// see elements for it again.
clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
} else {
WindowTracing.debug(
"{}.onTimers: Triggering for key:{}; window:{} at {} with "
+ "inputWatermark:{}; outputWatermark:{}",
key,
directContext.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
if (windowActivation.windowIsActiveAndOpen()
&& triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
emit(directContext, renamedContext);
}
if (windowActivation.isEndOfWindow) {
// If the window strategy trigger includes a watermark trigger then at this point
// there should be no data holds, either because we'd already cleared them on an
// earlier onTrigger, or because we just cleared them on the above emit.
// We could assert this but it is very expensive.
// Since we are processing an on-time firing we should schedule the garbage collection
// timer. (If getAllowedLateness is zero then the timer event will be considered a
// cleanup event and handled by the above).
// Note we must do this even if the trigger is finished so that we are sure to cleanup
// any final trigger finished bits.
checkState(
windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
"Unexpected zero getAllowedLateness");
Instant cleanupTime =
LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
WindowTracing.debug(
"ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
+ "inputWatermark:{}; outputWatermark:{}",
key,
directContext.window(),
cleanupTime,
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
checkState(
!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Cleanup time %s is beyond end-of-time",
cleanupTime);
directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
}
}
}
}
/**
* Clear all the state associated with {@code context}'s window. Should only be invoked if we know
* all future elements for this window will be considered beyond allowed lateness. This is a
* superset of the clearing done by {@link #emit} below since:
*
* <ol>
* <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
* closed again.
* <li>We can clear any remaining garbage collection hold.
* </ol>
*/
private void clearAllState(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
boolean windowIsActiveAndOpen)
throws Exception {
if (windowIsActiveAndOpen) {
// Since both the window is in the active window set AND the trigger was not yet closed,
// it is possible we still have state.
reduceFn.clearState(renamedContext);
watermarkHold.clearHolds(renamedContext);
nonEmptyPanes.clearPane(renamedContext.state());
// These calls work irrespective of whether the window is active or not, but
// are unnecessary if the window is not active.
triggerRunner.clearState(
directContext.window(), directContext.timers(), directContext.state());
paneInfoTracker.clear(directContext.state());
} else {
// If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
// For (1), if !activeWindows.isActive then the window must be merging and has been
// explicitly removed by emit. But in that case the trigger must have fired
// and been closed, so this case reduces to (2).
// For (2), if triggerRunner.isClosed then the trigger was fired and entered the
// closed state. In that case emit will have cleared all state in
// reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
// We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
// Since the trigger fired the existing watermark holds must have been cleared, and since
// the trigger closed no new end of window or garbage collection hold will have been
// placed by WatermarkHold.extractAndRelease.
// Thus all the state clearing above is unnecessary.
//
// But(!) for backwards compatibility we must allow a pipeline to be updated from
// an sdk version <= 1.3. In that case it is possible we have an end-of-window or
// garbage collection hold keyed by the current window (reached via directContext) rather
// than the state address window (reached via renamedContext).
// However this can only happen if:
// - We have merging windows.
// - We are DISCARDING_FIRED_PANES.
// - A pane has fired.
// - But the trigger is not (yet) closed.
if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES
&& !windowingStrategy.getWindowFn().isNonMerging()) {
watermarkHold.clearHolds(directContext);
}
}
// Don't need to track address state windows anymore.
activeWindows.remove(directContext.window());
// We'll never need to test for the trigger being closed again.
triggerRunner.clearFinished(directContext.state());
}
/** Should the reduce function state be cleared? */
private boolean shouldDiscardAfterFiring(boolean isFinished) {
if (isFinished) {
// This is the last firing for trigger.
return true;
}
if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
// Nothing should be accumulated between panes.
return true;
}
return false;
}
private void prefetchEmit(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
triggerRunner.prefetchIsClosed(directContext.state());
prefetchOnTrigger(directContext, renamedContext);
}
/** Emit if a trigger is ready to fire or timers require it, and cleanup state. */
private void emit(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
throws Exception {
checkState(
triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state()));
// Inform the trigger of the transition to see if it is finished
triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
boolean isFinished = triggerRunner.isClosed(directContext.state());
// Will be able to clear all element state after triggering?
boolean shouldDiscard = shouldDiscardAfterFiring(isFinished);
// Run onTrigger to produce the actual pane contents.
// As a side effect it will clear all element holds, but not necessarily any
// end-of-window or garbage collection holds.
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
// Now that we've triggered, the pane is empty.
nonEmptyPanes.clearPane(renamedContext.state());
// Cleanup buffered data if appropriate
if (shouldDiscard) {
// Cleanup flavor C: The user does not want any buffered data to persist between panes.
reduceFn.clearState(renamedContext);
}
if (isFinished) {
// Cleanup flavor D: If trigger is closed we will ignore all new incoming elements.
// Clear state not otherwise cleared by onTrigger and clearPane above.
// Remember the trigger is, indeed, closed until the window is garbage collected.
triggerRunner.clearState(
directContext.window(), directContext.timers(), directContext.state());
paneInfoTracker.clear(directContext.state());
activeWindows.remove(directContext.window());
}
}
/** Do we need to emit? */
private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
if (!isEmpty) {
// The pane has elements.
return true;
}
if (timing == Timing.ON_TIME
&& windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_ALWAYS) {
// This is an empty ON_TIME pane.
return true;
}
if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {
// This is known to be the final pane, and the user has requested it even when empty.
return true;
}
return false;
}
private void prefetchOnTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
paneInfoTracker.prefetchPaneInfo(directContext);
watermarkHold.prefetchExtract(renamedContext);
nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
reduceFn.prefetchOnTrigger(directContext.state());
}
/**
* Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
*
* @return output watermark hold added, or {@literal null} if none.
*/
@Nullable
private Instant onTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
final boolean isFinished,
boolean isEndOfWindow)
throws Exception {
// Extract the window hold, and as a side effect clear it.
final WatermarkHold.OldAndNewHolds pair =
watermarkHold.extractAndRelease(renamedContext, isFinished).read();
// TODO: This isn't accurate if the elements are late. See BEAM-2262
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;
final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
if (isEmpty
&& windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_IF_NON_EMPTY
&& windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) {
return newHold;
}
Instant inputWM = timerInternals.currentInputWatermarkTime();
if (newHold != null) {
// We can't be finished yet.
checkState(!isFinished, "new hold at %s but finished %s", newHold, directContext.window());
// The hold cannot be behind the input watermark.
checkState(
!newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
if (newHold.isAfter(directContext.window().maxTimestamp())) {
// The hold must be for garbage collection, which can't have happened yet.
checkState(
newHold.isEqual(
LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy)),
"new hold %s should be at garbage collection for window %s plus %s",
newHold,
directContext.window(),
windowingStrategy.getAllowedLateness());
} else {
// The hold must be for the end-of-window, which can't have happened yet.
checkState(
newHold.isEqual(directContext.window().maxTimestamp()),
"new hold %s should be at end of window %s",
newHold,
directContext.window());
checkState(
!isEndOfWindow,
"new hold at %s for %s but this is the watermark trigger",
newHold,
directContext.window());
}
}
// Calculate the pane info.
final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
// Run reduceFn.onTrigger method.
final List<W> windows = Collections.singletonList(directContext.window());
ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
contextFactory.forTrigger(
directContext.window(),
pane,
StateStyle.RENAMED,
toOutput -> {
// We're going to output panes, so commit the (now used) PaneInfo.
// This is unnecessary if the trigger isFinished since the saved
// state will be immediately deleted.
if (!isFinished) {
paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
}
// Output the actual value.
outputter.outputWindowedValue(KV.of(key, toOutput), outputTimestamp, windows, pane);
});
reduceFn.onTrigger(renamedTriggerContext);
}
return newHold;
}
/**
* Schedule a timer to garbage collect the window.
*
* <p>The timer:
*
* <ul>
* <li>...must be fired strictly after the expiration of the window.
* <li>...should be as close to the expiration as possible, to have a timely output of remaining
* buffered data, and GC.
* </ul>
*/
private void scheduleGarbageCollectionTimer(ReduceFn<?, ?, ?, W>.Context directContext) {
Instant inputWM = timerInternals.currentInputWatermarkTime();
Instant gcTime = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
WindowTracing.trace(
"ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at {} for "
+ "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
gcTime,
key,
directContext.window(),
inputWM,
timerInternals.currentOutputWatermarkTime());
checkState(
!gcTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Timer %s is beyond end-of-time",
gcTime);
directContext.timers().setTimer(gcTime, TimeDomain.EVENT_TIME);
}
private void cancelEndOfWindowAndGarbageCollectionTimers(
ReduceFn<?, ?, ?, W>.Context directContext) {
WindowTracing.debug(
"ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for "
+ "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
key,
directContext.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
Instant eow = directContext.window().maxTimestamp();
directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
Instant gc = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
if (gc.isAfter(eow)) {
directContext.timers().deleteTimer(gc, TimeDomain.EVENT_TIME);
}
}
private boolean windowIsExpired(BoundedWindow w) {
return timerInternals
.currentInputWatermarkTime()
.isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness()));
}
}