blob: ac642ceae7c0f1f47808279be78a4b064b46916f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.triggers;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.MergingStateAccessor;
import org.apache.beam.runners.core.StateAccessor;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.sdk.coders.BitSetCoder;
import org.apache.beam.sdk.state.Timers;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
/**
* Executes a trigger while managing persistence of information about which subtriggers are
* finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
*
* <p>Specifically, the responsibilities are:
*
* <ul>
* <li>Invoking the trigger's methods via its {@link ExecutableTriggerStateMachine} wrapper by
* constructing the appropriate trigger contexts.
* <li>Committing a record of which subtriggers are finished to persistent state.
* <li>Restoring the record of which subtriggers are finished from persistent state.
* <li>Clearing out the persisted finished set when a caller indicates (via {#link
* #clearFinished}) that it is no longer needed.
* </ul>
*
* <p>These responsibilities are intertwined: trigger contexts include mutable information about
* which subtriggers are finished. This class provides the information when building the contexts
* and commits the information when the method of the {@link ExecutableTriggerStateMachine} returns.
*
* @param <W> The kind of windows being processed.
*/
public class TriggerStateMachineRunner<W extends BoundedWindow> {
@VisibleForTesting
public static final StateTag<ValueState<BitSet>> FINISHED_BITS_TAG =
StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
private final ExecutableTriggerStateMachine rootTrigger;
private final TriggerStateMachineContextFactory<W> contextFactory;
public TriggerStateMachineRunner(
ExecutableTriggerStateMachine rootTrigger,
TriggerStateMachineContextFactory<W> contextFactory) {
checkState(rootTrigger.getTriggerIndex() == 0);
this.rootTrigger = rootTrigger;
this.contextFactory = contextFactory;
}
private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
if (!isFinishedSetNeeded()) {
// If no trigger in the tree will ever have finished bits, then we don't need to read them.
// So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
// finished) for each trigger in the tree.
return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
}
@Nullable BitSet bitSet = state.read();
return bitSet == null
? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
: FinishedTriggersBitSet.fromBitSet(bitSet);
}
private void clearFinishedBits(ValueState<BitSet> state) {
if (!isFinishedSetNeeded()) {
// Nothing to clear.
return;
}
state.clear();
}
/** Return true if the trigger is closed in the window corresponding to the specified state. */
public boolean isClosed(StateAccessor<?> state) {
return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
}
public void prefetchIsClosed(StateAccessor<?> state) {
if (isFinishedSetNeeded()) {
state.access(FINISHED_BITS_TAG).readLater();
}
}
public void prefetchForValue(W window, StateAccessor<?> state) {
prefetchIsClosed(state);
rootTrigger
.getSpec()
.prefetchOnElement(contextFactory.createStateAccessor(window, rootTrigger));
}
public void prefetchOnFire(W window, StateAccessor<?> state) {
prefetchIsClosed(state);
rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
}
public void prefetchShouldFire(W window, StateAccessor<?> state) {
prefetchIsClosed(state);
rootTrigger
.getSpec()
.prefetchShouldFire(contextFactory.createStateAccessor(window, rootTrigger));
}
/** Run the trigger logic to deal with a new value. */
public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
throws Exception {
// Clone so that we can detect changes and so that changes here don't pollute merging.
FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
TriggerStateMachine.OnElementContext triggerContext =
contextFactory.createOnElementContext(window, timers, timestamp, rootTrigger, finishedSet);
rootTrigger.invokeOnElement(triggerContext);
persistFinishedSet(state, finishedSet);
}
public void prefetchForMerge(
W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
if (isFinishedSetNeeded()) {
for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
value.readLater();
}
}
rootTrigger
.getSpec()
.prefetchOnMerge(
contextFactory.createMergingStateAccessor(window, mergingWindows, rootTrigger));
}
/** Run the trigger merging logic as part of executing the specified merge. */
public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
// Clone so that we can detect changes and so that changes here don't pollute merging.
FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
// And read the finished bits in each merging window.
ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
for (Map.Entry<W, ValueState<BitSet>> entry :
state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
// Don't need to clone these, since the trigger context doesn't allow modification
builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
// Clear the underlying finished bits.
clearFinishedBits(entry.getValue());
}
ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
TriggerStateMachine.OnMergeContext mergeContext =
contextFactory.createOnMergeContext(
window, timers, rootTrigger, finishedSet, mergingFinishedSets);
// Run the merge from the trigger
rootTrigger.invokeOnMerge(mergeContext);
persistFinishedSet(state, finishedSet);
}
public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
TriggerStateMachine.TriggerContext context =
contextFactory.base(window, timers, rootTrigger, finishedSet);
return rootTrigger.invokeShouldFire(context);
}
public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
// shouldFire should be false.
// However it is too expensive to assert.
FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
TriggerStateMachine.TriggerContext context =
contextFactory.base(window, timers, rootTrigger, finishedSet);
rootTrigger.invokeOnFire(context);
persistFinishedSet(state, finishedSet);
}
private void persistFinishedSet(
StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
if (!isFinishedSetNeeded()) {
return;
}
ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
if (modifiedFinishedSet.getBitSet().isEmpty()) {
finishedSetState.clear();
} else {
finishedSetState.write(modifiedFinishedSet.getBitSet());
}
}
}
/** Clear the finished bits. */
public void clearFinished(StateAccessor<?> state) {
clearFinishedBits(state.access(FINISHED_BITS_TAG));
}
/**
* Clear the state used for executing triggers, but leave the finished set to indicate the window
* is closed.
*/
public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
// Don't need to clone, because we'll be clearing the finished bits anyways.
FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
}
private boolean isFinishedSetNeeded() {
// TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
// lookup. Right now, we special case this for the DefaultTrigger.
return !(rootTrigger.getSpec() instanceof DefaultTriggerStateMachine);
}
}