blob: e0ecbedc6c17286167579199e4eb9abb10634f48 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.triggers;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.ActiveWindowSet;
import org.apache.beam.runners.core.MergingStateAccessor;
import org.apache.beam.runners.core.StateAccessor;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerInfo;
import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Instant;
/**
* Factory for creating instances of the various {@link TriggerStateMachine} contexts.
*
* <p>These contexts are highly interdependent and share many fields; it is inadvisable to create
* them via any means other than this factory class.
*/
public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
private final WindowFn<?, W> windowFn;
private StateInternals stateInternals;
private final Coder<W> windowCoder;
public TriggerStateMachineContextFactory(
WindowFn<?, W> windowFn, StateInternals stateInternals, ActiveWindowSet<W> activeWindows) {
// Future triggers may be able to exploit the active window to state address window mapping.
this.windowFn = windowFn;
this.stateInternals = stateInternals;
this.windowCoder = windowFn.windowCoder();
}
public TriggerStateMachine.TriggerContext base(
W window,
Timers timers,
ExecutableTriggerStateMachine rootTrigger,
FinishedTriggers finishedSet) {
return new TriggerContextImpl(window, timers, rootTrigger, finishedSet);
}
public TriggerStateMachine.OnElementContext createOnElementContext(
W window,
Timers timers,
Instant elementTimestamp,
ExecutableTriggerStateMachine rootTrigger,
FinishedTriggers finishedSet) {
return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp);
}
public TriggerStateMachine.OnMergeContext createOnMergeContext(
W window,
Timers timers,
ExecutableTriggerStateMachine rootTrigger,
FinishedTriggers finishedSet,
Map<W, FinishedTriggers> finishedSets) {
return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets);
}
public StateAccessor<?> createStateAccessor(W window, ExecutableTriggerStateMachine trigger) {
return new StateAccessorImpl(window, trigger);
}
public MergingStateAccessor<?, W> createMergingStateAccessor(
W mergeResult, Collection<W> mergingWindows, ExecutableTriggerStateMachine trigger) {
return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult);
}
private class TriggerInfoImpl implements TriggerStateMachine.TriggerInfo {
protected final ExecutableTriggerStateMachine trigger;
protected final FinishedTriggers finishedSet;
private final TriggerStateMachine.TriggerContext context;
public TriggerInfoImpl(
ExecutableTriggerStateMachine trigger,
FinishedTriggers finishedSet,
TriggerStateMachine.TriggerContext context) {
this.trigger = trigger;
this.finishedSet = finishedSet;
this.context = context;
}
@Override
public boolean isMerging() {
return !windowFn.isNonMerging();
}
@Override
public Iterable<ExecutableTriggerStateMachine> subTriggers() {
return trigger.subTriggers();
}
@Override
public ExecutableTriggerStateMachine subTrigger(int subtriggerIndex) {
return trigger.subTriggers().get(subtriggerIndex);
}
@Override
public boolean isFinished() {
return finishedSet.isFinished(trigger);
}
@Override
public boolean isFinished(int subtriggerIndex) {
return finishedSet.isFinished(subTrigger(subtriggerIndex));
}
@Override
public boolean areAllSubtriggersFinished() {
return Iterables.isEmpty(unfinishedSubTriggers());
}
@Override
public Iterable<ExecutableTriggerStateMachine> unfinishedSubTriggers() {
return FluentIterable.from(trigger.subTriggers())
.filter(trigger -> !finishedSet.isFinished(trigger));
}
@Override
public @Nullable ExecutableTriggerStateMachine firstUnfinishedSubTrigger() {
for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) {
if (!finishedSet.isFinished(subTrigger)) {
return subTrigger;
}
}
return null;
}
@Override
public void resetTree() throws Exception {
finishedSet.clearRecursively(trigger);
trigger.invokeClear(context);
}
@Override
public void setFinished(boolean finished) {
finishedSet.setFinished(trigger, finished);
}
@Override
public void setFinished(boolean finished, int subTriggerIndex) {
finishedSet.setFinished(subTrigger(subTriggerIndex), finished);
}
}
private class TriggerTimers implements Timers {
private final Timers timers;
private final W window;
public TriggerTimers(W window, Timers timers) {
this.timers = timers;
this.window = window;
}
@Override
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
timers.setTimer(timestamp, timeDomain);
}
@Override
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
if (timeDomain == TimeDomain.EVENT_TIME && timestamp.equals(window.maxTimestamp())) {
// Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time
// state transitions.
return;
}
timers.deleteTimer(timestamp, timeDomain);
}
@Override
public Instant currentProcessingTime() {
return timers.currentProcessingTime();
}
@Override
@Nullable
public Instant currentSynchronizedProcessingTime() {
return timers.currentSynchronizedProcessingTime();
}
@Override
public Instant currentEventTime() {
return timers.currentEventTime();
}
}
private class MergingTriggerInfoImpl extends TriggerInfoImpl
implements TriggerStateMachine.MergingTriggerInfo {
private final Map<W, FinishedTriggers> finishedSets;
public MergingTriggerInfoImpl(
ExecutableTriggerStateMachine trigger,
FinishedTriggers finishedSet,
TriggerStateMachine.TriggerContext context,
Map<W, FinishedTriggers> finishedSets) {
super(trigger, finishedSet, context);
this.finishedSets = finishedSets;
}
@Override
public boolean finishedInAnyMergingWindow() {
for (FinishedTriggers finishedSet : finishedSets.values()) {
if (finishedSet.isFinished(trigger)) {
return true;
}
}
return false;
}
@Override
public boolean finishedInAllMergingWindows() {
for (FinishedTriggers finishedSet : finishedSets.values()) {
if (!finishedSet.isFinished(trigger)) {
return false;
}
}
return true;
}
}
private class StateAccessorImpl implements StateAccessor<Object> {
protected final int triggerIndex;
protected final StateNamespace windowNamespace;
public StateAccessorImpl(W window, ExecutableTriggerStateMachine trigger) {
this.triggerIndex = trigger.getTriggerIndex();
this.windowNamespace = namespaceFor(window);
}
protected StateNamespace namespaceFor(W window) {
return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex);
}
@Override
public <StateT extends State> StateT access(StateTag<StateT> address) {
return stateInternals.state(windowNamespace, address);
}
}
private class MergingStateAccessorImpl extends StateAccessorImpl
implements MergingStateAccessor<Object, W> {
private final Collection<W> activeToBeMerged;
public MergingStateAccessorImpl(
ExecutableTriggerStateMachine trigger, Collection<W> activeToBeMerged, W mergeResult) {
super(mergeResult, trigger);
this.activeToBeMerged = activeToBeMerged;
}
@Override
public <StateT extends State> StateT access(StateTag<StateT> address) {
return stateInternals.state(windowNamespace, address);
}
@Override
public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
StateTag<StateT> address) {
ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
for (W mergingWindow : activeToBeMerged) {
StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
builder.put(mergingWindow, stateForWindow);
}
return builder.build();
}
}
private class TriggerContextImpl extends TriggerStateMachine.TriggerContext {
private final W window;
private final StateAccessorImpl state;
private final Timers timers;
private final TriggerInfoImpl triggerInfo;
private TriggerContextImpl(
W window,
Timers timers,
ExecutableTriggerStateMachine trigger,
FinishedTriggers finishedSet) {
this.window = window;
this.state = new StateAccessorImpl(window, trigger);
this.timers = new TriggerTimers(window, timers);
this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
}
@Override
public TriggerStateMachine.TriggerContext forTrigger(ExecutableTriggerStateMachine trigger) {
return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet);
}
@Override
public TriggerInfo trigger() {
return triggerInfo;
}
@Override
public StateAccessor<?> state() {
return state;
}
@Override
public W window() {
return window;
}
@Override
public void deleteTimer(Instant timestamp, TimeDomain domain) {
timers.deleteTimer(timestamp, domain);
}
@Override
public Instant currentProcessingTime() {
return timers.currentProcessingTime();
}
@Override
@Nullable
public Instant currentSynchronizedProcessingTime() {
return timers.currentSynchronizedProcessingTime();
}
@Override
@Nullable
public Instant currentEventTime() {
return timers.currentEventTime();
}
}
private class OnElementContextImpl extends TriggerStateMachine.OnElementContext {
private final W window;
private final StateAccessorImpl state;
private final Timers timers;
private final TriggerInfoImpl triggerInfo;
private final Instant eventTimestamp;
private OnElementContextImpl(
W window,
Timers timers,
ExecutableTriggerStateMachine trigger,
FinishedTriggers finishedSet,
Instant eventTimestamp) {
this.window = window;
this.state = new StateAccessorImpl(window, trigger);
this.timers = new TriggerTimers(window, timers);
this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
this.eventTimestamp = eventTimestamp;
}
@Override
public Instant eventTimestamp() {
return eventTimestamp;
}
@Override
public TriggerStateMachine.OnElementContext forTrigger(ExecutableTriggerStateMachine trigger) {
return new OnElementContextImpl(
window, timers, trigger, triggerInfo.finishedSet, eventTimestamp);
}
@Override
public TriggerInfo trigger() {
return triggerInfo;
}
@Override
public StateAccessor<?> state() {
return state;
}
@Override
public W window() {
return window;
}
@Override
public void setTimer(Instant timestamp, TimeDomain domain) {
timers.setTimer(timestamp, domain);
}
@Override
public void deleteTimer(Instant timestamp, TimeDomain domain) {
timers.deleteTimer(timestamp, domain);
}
@Override
public Instant currentProcessingTime() {
return timers.currentProcessingTime();
}
@Override
@Nullable
public Instant currentSynchronizedProcessingTime() {
return timers.currentSynchronizedProcessingTime();
}
@Override
@Nullable
public Instant currentEventTime() {
return timers.currentEventTime();
}
}
private class OnMergeContextImpl extends TriggerStateMachine.OnMergeContext {
private final MergingStateAccessor<?, W> state;
private final W window;
private final Collection<W> mergingWindows;
private final Timers timers;
private final MergingTriggerInfoImpl triggerInfo;
private OnMergeContextImpl(
W window,
Timers timers,
ExecutableTriggerStateMachine trigger,
FinishedTriggers finishedSet,
Map<W, FinishedTriggers> finishedSets) {
this.mergingWindows = finishedSets.keySet();
this.window = window;
this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window);
this.timers = new TriggerTimers(window, timers);
this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets);
}
@Override
public TriggerStateMachine.OnMergeContext forTrigger(ExecutableTriggerStateMachine trigger) {
return new OnMergeContextImpl(
window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets);
}
@Override
public MergingStateAccessor<?, W> state() {
return state;
}
@Override
public MergingTriggerInfo trigger() {
return triggerInfo;
}
@Override
public W window() {
return window;
}
@Override
public void setTimer(Instant timestamp, TimeDomain domain) {
timers.setTimer(timestamp, domain);
}
@Override
public void deleteTimer(Instant timestamp, TimeDomain domain) {
timers.setTimer(timestamp, domain);
}
@Override
public Instant currentProcessingTime() {
return timers.currentProcessingTime();
}
@Override
@Nullable
public Instant currentSynchronizedProcessingTime() {
return timers.currentSynchronizedProcessingTime();
}
@Override
@Nullable
public Instant currentEventTime() {
return timers.currentEventTime();
}
}
}