blob: c651e4037fff5b5009135b2eef42d47c0368f183 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.triggers;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
/**
* A composite {@link TriggerStateMachine} that executes its sub-triggers in order. Only one
* sub-trigger is executing at a time, and any time it fires the {@code AfterEach} fires. When the
* currently executing sub-trigger finishes, the {@code AfterEach} starts executing the next
* sub-trigger.
*
* <p>{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the sub-triggers have finished.
*
* <p>The following properties hold:
*
* <ul>
* <li>{@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the same as {@code
* AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, AfterEach.inOrder(b, c)}.
* <li>{@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as {@code
* Repeatedly.forever(a)}, since the repeated trigger never finishes.
* </ul>
*/
public class AfterEachStateMachine extends TriggerStateMachine {
private AfterEachStateMachine(List<TriggerStateMachine> subTriggers) {
super(subTriggers);
checkArgument(subTriggers.size() > 1);
}
/** Returns an {@code AfterEach} {@code Trigger} with the given subtriggers. */
@SafeVarargs
public static TriggerStateMachine inOrder(TriggerStateMachine... triggers) {
return new AfterEachStateMachine(Arrays.asList(triggers));
}
public static TriggerStateMachine inOrder(Iterable<? extends TriggerStateMachine> triggers) {
return new AfterEachStateMachine(ImmutableList.copyOf(triggers));
}
@Override
public void onElement(OnElementContext c) throws Exception {
if (!c.trigger().isMerging()) {
// If merges are not possible, we need only run the first unfinished subtrigger
c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
} else {
// If merges are possible, we need to run all subtriggers in parallel
for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) {
// Even if the subTrigger is done, it may be revived via merging and must have
// adequate state.
subTrigger.invokeOnElement(c);
}
}
}
@Override
public void onMerge(OnMergeContext context) throws Exception {
// If merging makes a subtrigger no-longer-finished, it will automatically
// begin participating in shouldFire and onFire appropriately.
// All the following triggers are retroactively "not started" but that is
// also automatic because they are cleared whenever this trigger
// fires.
boolean priorTriggersAllFinished = true;
for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) {
if (priorTriggersAllFinished) {
subTrigger.invokeOnMerge(context);
priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished();
} else {
subTrigger.invokeClear(context);
}
}
updateFinishedState(context);
}
@Override
public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception {
ExecutableTriggerStateMachine firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
return firstUnfinished.invokeShouldFire(context);
}
@Override
public void onFire(TriggerStateMachine.TriggerContext context) throws Exception {
context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
// Reset all subtriggers if in a merging context; any may be revived by merging so they are
// all run in parallel for each pending pane.
if (context.trigger().isMerging()) {
for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) {
subTrigger.invokeClear(context);
}
}
updateFinishedState(context);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
Joiner.on(", ").appendTo(builder, subTriggers);
builder.append(")");
return builder.toString();
}
private void updateFinishedState(TriggerContext context) {
context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
}
}