blob: 70888f21f5c231b5ba3d8b9b0e0bbb1bc146418b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.triggers;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.state.TimeDomain;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests the {@link TriggerStateMachines} static utility methods. */
@RunWith(JUnit4.class)
public class TriggerStateMachinesTest {
//
// Tests for leaf trigger translation
//
@Test
public void testStateMachineForAfterPane() {
int count = 37;
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setElementCount(RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(count))
.build();
AfterPaneStateMachine machine =
(AfterPaneStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(machine.getElementCount(), equalTo(trigger.getElementCount().getElementCount()));
}
// TODO: make these all build the proto
@Test
public void testStateMachineForAfterProcessingTime() {
Duration minutes = Duration.standardMinutes(94);
Duration hours = Duration.standardHours(13);
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setAfterProcessingTime(
RunnerApi.Trigger.AfterProcessingTime.newBuilder()
.addTimestampTransforms(
RunnerApi.TimestampTransform.newBuilder()
.setDelay(
RunnerApi.TimestampTransform.Delay.newBuilder()
.setDelayMillis(minutes.getMillis())))
.addTimestampTransforms(
RunnerApi.TimestampTransform.newBuilder()
.setAlignTo(
RunnerApi.TimestampTransform.AlignTo.newBuilder()
.setPeriod(hours.getMillis()))))
.build();
AfterDelayFromFirstElementStateMachine machine =
(AfterDelayFromFirstElementStateMachine)
TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME));
}
@Test
public void testStateMachineForAfterWatermark() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.getDefaultInstance())
.build();
AfterWatermarkStateMachine.FromEndOfWindow machine =
(AfterWatermarkStateMachine.FromEndOfWindow)
TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(
TriggerStateMachines.stateMachineForTrigger(trigger),
instanceOf(AfterWatermarkStateMachine.FromEndOfWindow.class));
}
@Test
public void testDefaultTriggerTranslation() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
.build();
assertThat(
TriggerStateMachines.stateMachineForTrigger(trigger),
instanceOf(DefaultTriggerStateMachine.class));
}
@Test
public void testNeverTranslation() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setNever(RunnerApi.Trigger.Never.getDefaultInstance())
.build();
NeverStateMachine machine =
(NeverStateMachine) checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
// No parameters, so if it doesn't crash, we win!
}
//
// Tests for composite trigger translation
//
// These check just that translation was invoked recursively using somewhat random
// leaf subtriggers; by induction it all holds together. Beyond this, explicit tests
// of particular triggers will suffice.
private static final int ELEM_COUNT = 472;
private static final Duration DELAY = Duration.standardSeconds(95673);
private final RunnerApi.Trigger subtrigger1 =
RunnerApi.Trigger.newBuilder()
.setElementCount(RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(ELEM_COUNT))
.build();
private final RunnerApi.Trigger subtrigger2 =
RunnerApi.Trigger.newBuilder()
.setAfterProcessingTime(
RunnerApi.Trigger.AfterProcessingTime.newBuilder()
.addTimestampTransforms(
RunnerApi.TimestampTransform.newBuilder()
.setDelay(
RunnerApi.TimestampTransform.Delay.newBuilder()
.setDelayMillis(DELAY.getMillis()))))
.build();
private final TriggerStateMachine submachine1 =
TriggerStateMachines.stateMachineForTrigger(subtrigger1);
private final TriggerStateMachine submachine2 =
TriggerStateMachines.stateMachineForTrigger(subtrigger2);
@Test
public void testAfterEachTranslation() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setAfterEach(
RunnerApi.Trigger.AfterEach.newBuilder()
.addSubtriggers(subtrigger1)
.addSubtriggers(subtrigger2))
.build();
AfterEachStateMachine machine =
(AfterEachStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(machine, equalTo(AfterEachStateMachine.inOrder(submachine1, submachine2)));
}
@Test
public void testAfterFirstTranslation() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setAfterAny(
RunnerApi.Trigger.AfterAny.newBuilder()
.addSubtriggers(subtrigger1)
.addSubtriggers(subtrigger2))
.build();
AfterFirstStateMachine machine =
(AfterFirstStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(machine, equalTo(AfterFirstStateMachine.of(submachine1, submachine2)));
}
@Test
public void testAfterAllTranslation() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setAfterAll(
RunnerApi.Trigger.AfterAll.newBuilder()
.addSubtriggers(subtrigger1)
.addSubtriggers(subtrigger2))
.build();
AfterAllStateMachine machine =
(AfterAllStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(machine, equalTo(AfterAllStateMachine.of(submachine1, submachine2)));
}
@Test
public void testAfterWatermarkEarlyTranslation() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setAfterEndOfWindow(
RunnerApi.Trigger.AfterEndOfWindow.newBuilder().setEarlyFirings(subtrigger1))
.build();
AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
(AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(
machine,
equalTo(AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(submachine1)));
}
@Test
public void testAfterWatermarkEarlyLateTranslation() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setAfterEndOfWindow(
RunnerApi.Trigger.AfterEndOfWindow.newBuilder()
.setEarlyFirings(subtrigger1)
.setLateFirings(subtrigger2))
.build();
AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
(AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(
machine,
equalTo(
AfterWatermarkStateMachine.pastEndOfWindow()
.withEarlyFirings(submachine1)
.withLateFirings(submachine2)));
}
@Test
public void testOrFinallyTranslation() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setOrFinally(
RunnerApi.Trigger.OrFinally.newBuilder()
.setMain(subtrigger1)
.setFinally(subtrigger2))
.build();
OrFinallyStateMachine machine =
(OrFinallyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(machine, equalTo(submachine1.orFinally(submachine2)));
}
@Test
public void testRepeatedlyTranslation() {
RunnerApi.Trigger trigger =
RunnerApi.Trigger.newBuilder()
.setRepeat(RunnerApi.Trigger.Repeat.newBuilder().setSubtrigger(subtrigger1))
.build();
RepeatedlyStateMachine machine =
(RepeatedlyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
assertThat(machine, equalTo(RepeatedlyStateMachine.forever(submachine1)));
}
}