| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.transforms; |
| |
| import static junit.framework.TestCase.assertTrue; |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; |
| import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; |
| import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; |
| import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| import static org.hamcrest.Matchers.allOf; |
| import static org.hamcrest.Matchers.anyOf; |
| import static org.hamcrest.Matchers.containsString; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.hasItem; |
| import static org.hamcrest.Matchers.not; |
| import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.fail; |
| |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.function.IntFunction; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| import java.util.stream.LongStream; |
| import java.util.stream.StreamSupport; |
| import org.apache.beam.sdk.coders.AtomicCoder; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.KvCoder; |
| import org.apache.beam.sdk.coders.ListCoder; |
| import org.apache.beam.sdk.coders.SetCoder; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.coders.VarIntCoder; |
| import org.apache.beam.sdk.coders.VoidCoder; |
| import org.apache.beam.sdk.io.GenerateSequence; |
| import org.apache.beam.sdk.options.Default; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.state.BagState; |
| import org.apache.beam.sdk.state.CombiningState; |
| import org.apache.beam.sdk.state.GroupingState; |
| import org.apache.beam.sdk.state.MapState; |
| import org.apache.beam.sdk.state.ReadableState; |
| import org.apache.beam.sdk.state.SetState; |
| import org.apache.beam.sdk.state.StateSpec; |
| import org.apache.beam.sdk.state.StateSpecs; |
| import org.apache.beam.sdk.state.TimeDomain; |
| import org.apache.beam.sdk.state.Timer; |
| import org.apache.beam.sdk.state.TimerMap; |
| import org.apache.beam.sdk.state.TimerSpec; |
| import org.apache.beam.sdk.state.TimerSpecs; |
| import org.apache.beam.sdk.state.ValueState; |
| import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; |
| import org.apache.beam.sdk.testing.NeedsRunner; |
| import org.apache.beam.sdk.testing.PAssert; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.testing.TestStream; |
| import org.apache.beam.sdk.testing.UsesMapState; |
| import org.apache.beam.sdk.testing.UsesSetState; |
| import org.apache.beam.sdk.testing.UsesSideInputs; |
| import org.apache.beam.sdk.testing.UsesSideInputsWithDifferentCoders; |
| import org.apache.beam.sdk.testing.UsesStatefulParDo; |
| import org.apache.beam.sdk.testing.UsesStrictTimerOrdering; |
| import org.apache.beam.sdk.testing.UsesTestStream; |
| import org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp; |
| import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime; |
| import org.apache.beam.sdk.testing.UsesTimerMap; |
| import org.apache.beam.sdk.testing.UsesTimersInParDo; |
| import org.apache.beam.sdk.testing.ValidatesRunner; |
| import org.apache.beam.sdk.transforms.DoFn.OnTimer; |
| import org.apache.beam.sdk.transforms.DoFn.ProcessElement; |
| import org.apache.beam.sdk.transforms.Mean.CountSum; |
| import org.apache.beam.sdk.transforms.ParDo.SingleOutput; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.transforms.display.DisplayData.Builder; |
| import org.apache.beam.sdk.transforms.display.DisplayDataMatchers; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.FixedWindows; |
| import org.apache.beam.sdk.transforms.windowing.GlobalWindow; |
| import org.apache.beam.sdk.transforms.windowing.IntervalWindow; |
| import org.apache.beam.sdk.transforms.windowing.SlidingWindows; |
| import org.apache.beam.sdk.transforms.windowing.Window; |
| import org.apache.beam.sdk.util.common.ElementByteSizeObserver; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollectionTuple; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.PDone; |
| import org.apache.beam.sdk.values.TimestampedValue; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.TupleTagList; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; |
| import org.hamcrest.Matchers; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.joda.time.MutableDateTime; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for ParDo. */ |
| public class ParDoTest implements Serializable { |
| // This test is Serializable, just so that it's easy to have |
| // anonymous inner classes inside the non-static test methods. |
| |
| /** Shared base test base with setup/teardown helpers. */ |
| public abstract static class SharedTestBase { |
| @Rule public final transient TestPipeline pipeline = TestPipeline.create(); |
| |
| @Rule public transient ExpectedException thrown = ExpectedException.none(); |
| } |
| |
| private static class PrintingDoFn extends DoFn<String, String> { |
| @ProcessElement |
| public void processElement( |
| @Element String element, |
| @Timestamp Instant timestamp, |
| BoundedWindow window, |
| OutputReceiver<String> receiver) { |
| receiver.output( |
| element + ":" + timestamp.getMillis() + ":" + window.maxTimestamp().getMillis()); |
| } |
| } |
| |
| static class TestNoOutputDoFn extends DoFn<Integer, String> { |
| @ProcessElement |
| public void processElement(DoFn<Integer, String>.ProcessContext c) throws Exception {} |
| } |
| |
| static class TestDoFn extends DoFn<Integer, String> { |
| enum State { |
| NOT_SET_UP, |
| UNSTARTED, |
| STARTED, |
| PROCESSING, |
| FINISHED |
| } |
| |
| State state = State.NOT_SET_UP; |
| |
| final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); |
| final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>(); |
| |
| public TestDoFn() {} |
| |
| public TestDoFn( |
| List<PCollectionView<Integer>> sideInputViews, |
| List<TupleTag<String>> additionalOutputTupleTags) { |
| this.sideInputViews.addAll(sideInputViews); |
| this.additionalOutputTupleTags.addAll(additionalOutputTupleTags); |
| } |
| |
| @Setup |
| public void prepare() { |
| assertEquals(State.NOT_SET_UP, state); |
| state = State.UNSTARTED; |
| } |
| |
| @StartBundle |
| public void startBundle() { |
| assertThat(state, anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED))); |
| |
| state = State.STARTED; |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext c, @Element Integer element) { |
| assertThat(state, anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING))); |
| state = State.PROCESSING; |
| outputToAllWithSideInputs(c, "processing: " + element); |
| } |
| |
| @FinishBundle |
| public void finishBundle(FinishBundleContext c) { |
| assertThat(state, anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING))); |
| state = State.FINISHED; |
| c.output("finished", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE); |
| for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { |
| c.output( |
| additionalOutputTupleTag, |
| additionalOutputTupleTag.getId() + ": " + "finished", |
| BoundedWindow.TIMESTAMP_MIN_VALUE, |
| GlobalWindow.INSTANCE); |
| } |
| } |
| |
| private void outputToAllWithSideInputs(ProcessContext c, String value) { |
| if (!sideInputViews.isEmpty()) { |
| List<Integer> sideInputValues = new ArrayList<>(); |
| for (PCollectionView<Integer> sideInputView : sideInputViews) { |
| sideInputValues.add(c.sideInput(sideInputView)); |
| } |
| value += ": " + sideInputValues; |
| } |
| c.output(value); |
| for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { |
| c.output(additionalOutputTupleTag, additionalOutputTupleTag.getId() + ": " + value); |
| } |
| } |
| } |
| |
| static class TestStartBatchErrorDoFn extends DoFn<Integer, String> { |
| @StartBundle |
| public void startBundle() { |
| throw new RuntimeException("test error in initialize"); |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| // This has to be here. |
| } |
| } |
| |
| static class TestProcessElementErrorDoFn extends DoFn<Integer, String> { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| throw new RuntimeException("test error in process"); |
| } |
| } |
| |
| static class TestFinishBatchErrorDoFn extends DoFn<Integer, String> { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| // This has to be here. |
| } |
| |
| @FinishBundle |
| public void finishBundle(FinishBundleContext c) { |
| throw new RuntimeException("test error in finalize"); |
| } |
| } |
| |
| private static class StrangelyNamedDoer extends DoFn<Integer, String> { |
| @ProcessElement |
| public void processElement(ProcessContext c) {} |
| } |
| |
| static class TestOutputTimestampDoFn<T extends Number> extends DoFn<T, T> { |
| @ProcessElement |
| public void processElement(@Element T value, OutputReceiver<T> r) { |
| r.outputWithTimestamp(value, new Instant(value.longValue())); |
| } |
| } |
| |
| static class TestShiftTimestampDoFn<T extends Number> extends DoFn<T, T> { |
| private Duration allowedTimestampSkew; |
| private Duration durationToShift; |
| |
| public TestShiftTimestampDoFn(Duration allowedTimestampSkew, Duration durationToShift) { |
| this.allowedTimestampSkew = allowedTimestampSkew; |
| this.durationToShift = durationToShift; |
| } |
| |
| @Override |
| public Duration getAllowedTimestampSkew() { |
| return allowedTimestampSkew; |
| } |
| |
| @ProcessElement |
| public void processElement( |
| @Element T value, @Timestamp Instant timestamp, OutputReceiver<T> r) { |
| checkNotNull(timestamp); |
| r.outputWithTimestamp(value, timestamp.plus(durationToShift)); |
| } |
| } |
| |
| static class TestFormatTimestampDoFn<T extends Number> extends DoFn<T, String> { |
| @ProcessElement |
| public void processElement( |
| @Element T element, @Timestamp Instant timestamp, OutputReceiver<String> r) { |
| checkNotNull(timestamp); |
| r.output("processing: " + element + ", timestamp: " + timestamp.getMillis()); |
| } |
| } |
| |
| static class MultiFilter extends PTransform<PCollection<Integer>, PCollectionTuple> { |
| |
| private static final TupleTag<Integer> BY2 = new TupleTag<Integer>("by2") {}; |
| private static final TupleTag<Integer> BY3 = new TupleTag<Integer>("by3") {}; |
| |
| @Override |
| public PCollectionTuple expand(PCollection<Integer> input) { |
| PCollection<Integer> by2 = input.apply("Filter2s", ParDo.of(new FilterFn(2))); |
| PCollection<Integer> by3 = input.apply("Filter3s", ParDo.of(new FilterFn(3))); |
| return PCollectionTuple.of(BY2, by2).and(BY3, by3); |
| } |
| |
| static class FilterFn extends DoFn<Integer, Integer> { |
| private final int divisor; |
| |
| FilterFn(int divisor) { |
| this.divisor = divisor; |
| } |
| |
| @ProcessElement |
| public void processElement(@Element Integer element, OutputReceiver<Integer> r) |
| throws Exception { |
| if (element % divisor == 0) { |
| r.output(element); |
| } |
| } |
| } |
| } |
| |
| /** Tests for basic {@link ParDo} scenarios. */ |
| @RunWith(JUnit4.class) |
| public static class BasicTests extends SharedTestBase implements Serializable { |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDo() { |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| PCollection<String> output = |
| pipeline.apply(Create.of(inputs)).apply(ParDo.of(new TestDoFn())); |
| |
| PAssert.that(output).satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDoEmpty() { |
| |
| List<Integer> inputs = Arrays.asList(); |
| |
| PCollection<String> output = |
| pipeline |
| .apply(Create.of(inputs).withCoder(VarIntCoder.of())) |
| .apply("TestDoFn", ParDo.of(new TestDoFn())); |
| |
| PAssert.that(output).satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDoEmptyOutputs() { |
| |
| List<Integer> inputs = Arrays.asList(); |
| |
| PCollection<String> output = |
| pipeline |
| .apply(Create.of(inputs).withCoder(VarIntCoder.of())) |
| .apply("TestDoFn", ParDo.of(new TestNoOutputDoFn())); |
| |
| PAssert.that(output).empty(); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testParDoTransformNameBasedDoFnWithTrimmedSuffix() { |
| assertThat(ParDo.of(new PrintingDoFn()).getName(), containsString("ParDo(Printing)")); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDoInCustomTransform() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| PCollection<String> output = |
| pipeline |
| .apply(Create.of(inputs)) |
| .apply( |
| "CustomTransform", |
| new PTransform<PCollection<Integer>, PCollection<String>>() { |
| @Override |
| public PCollection<String> expand(PCollection<Integer> input) { |
| return input.apply(ParDo.of(new TestDoFn())); |
| } |
| }); |
| |
| // Test that Coder inference of the result works through |
| // user-defined PTransforms. |
| PAssert.that(output).satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testJsonEscaping() { |
| // Declare an arbitrary function and make sure we can serialize it |
| DoFn<Integer, Integer> doFn = |
| new DoFn<Integer, Integer>() { |
| @ProcessElement |
| public void processElement(@Element Integer element, OutputReceiver<Integer> r) { |
| r.output(element + 1); |
| } |
| }; |
| |
| byte[] serializedBytes = serializeToByteArray(doFn); |
| String serializedJson = byteArrayToJsonString(serializedBytes); |
| assertArrayEquals(serializedBytes, jsonStringToByteArray(serializedJson)); |
| } |
| |
| @Test |
| public void testDoFnDisplayData() { |
| DoFn<String, String> fn = |
| new DoFn<String, String>() { |
| @ProcessElement |
| public void processElement(ProcessContext c) {} |
| |
| @Override |
| public void populateDisplayData(Builder builder) { |
| builder.add(DisplayData.item("doFnMetadata", "bar")); |
| } |
| }; |
| |
| SingleOutput<String, String> parDo = ParDo.of(fn); |
| |
| DisplayData displayData = DisplayData.from(parDo); |
| assertThat( |
| displayData, |
| hasDisplayItem( |
| allOf( |
| hasKey("fn"), |
| hasType(DisplayData.Type.JAVA_CLASS), |
| DisplayDataMatchers.hasValue(fn.getClass().getName())))); |
| |
| assertThat(displayData, includesDisplayDataFor("fn", fn)); |
| } |
| |
| @Test |
| public void testDoFnWithContextDisplayData() { |
| DoFn<String, String> fn = |
| new DoFn<String, String>() { |
| @ProcessElement |
| public void proccessElement(ProcessContext c) {} |
| |
| @Override |
| public void populateDisplayData(Builder builder) { |
| builder.add(DisplayData.item("fnMetadata", "foobar")); |
| } |
| }; |
| |
| SingleOutput<String, String> parDo = ParDo.of(fn); |
| |
| DisplayData displayData = DisplayData.from(parDo); |
| assertThat(displayData, includesDisplayDataFor("fn", fn)); |
| assertThat(displayData, hasDisplayItem("fn", fn.getClass())); |
| } |
| |
| @Test |
| public void testRejectsWrongWindowType() { |
| |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage(GlobalWindow.class.getSimpleName()); |
| thrown.expectMessage(IntervalWindow.class.getSimpleName()); |
| thrown.expectMessage("window type"); |
| thrown.expectMessage("not a supertype"); |
| |
| pipeline |
| .apply(Create.of(1, 2, 3)) |
| .apply( |
| ParDo.of( |
| new DoFn<Integer, Integer>() { |
| @ProcessElement |
| public void process(ProcessContext c, IntervalWindow w) {} |
| })); |
| } |
| |
| /** |
| * Tests that it is OK to use different window types in the parameter lists to different {@link |
| * DoFn} functions, as long as they are all subtypes of the actual window type of the input. |
| * |
| * <p>Today, the only method other than {@link ProcessElement @ProcessElement} that can accept |
| * extended parameters is {@link OnTimer @OnTimer}, which is rejected before it reaches window |
| * type validation. Rather than delay validation, this test is temporarily disabled. |
| */ |
| @Ignore("ParDo rejects this on account of it using timers") |
| @Test |
| public void testMultipleWindowSubtypesOK() { |
| final String timerId = "gobbledegook"; |
| |
| pipeline |
| .apply(Create.of(1, 2, 3)) |
| .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) |
| .apply( |
| ParDo.of( |
| new DoFn<Integer, Integer>() { |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void process(ProcessContext c, IntervalWindow w) {} |
| |
| @OnTimer(timerId) |
| public void onTimer(BoundedWindow w) {} |
| })); |
| |
| // If it doesn't crash, we made it! |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testPipelineOptionsParameter() { |
| PCollection<String> results = |
| pipeline |
| .apply(Create.of(1)) |
| .apply( |
| ParDo.of( |
| new DoFn<Integer, String>() { |
| @ProcessElement |
| public void process(OutputReceiver<String> r, PipelineOptions options) { |
| r.output(options.as(MyOptions.class).getFakeOption()); |
| } |
| })); |
| |
| String testOptionValue = "not fake anymore"; |
| pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue); |
| PAssert.that(results).containsInAnyOrder("not fake anymore"); |
| |
| pipeline.run(); |
| } |
| } |
| |
| /** Tests to validate behaviors around multiple inputs or outputs. */ |
| @RunWith(JUnit4.class) |
| public static class MultipleInputsAndOutputTests extends SharedTestBase implements Serializable { |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDoWithTaggedOutput() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| TupleTag<String> mainOutputTag = new TupleTag<String>("main") {}; |
| TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1") {}; |
| TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2") {}; |
| TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3") {}; |
| TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput") {}; |
| |
| PCollectionTuple outputs = |
| pipeline |
| .apply(Create.of(inputs)) |
| .apply( |
| ParDo.of( |
| new TestDoFn( |
| Arrays.asList(), |
| Arrays.asList( |
| additionalOutputTag1, |
| additionalOutputTag2, |
| additionalOutputTag3))) |
| .withOutputTags( |
| mainOutputTag, |
| TupleTagList.of(additionalOutputTag3) |
| .and(additionalOutputTag1) |
| .and(additionalOutputTagUnwritten) |
| .and(additionalOutputTag2))); |
| |
| PAssert.that(outputs.get(mainOutputTag)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); |
| |
| PAssert.that(outputs.get(additionalOutputTag1)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag1)); |
| PAssert.that(outputs.get(additionalOutputTag2)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag2)); |
| PAssert.that(outputs.get(additionalOutputTag3)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag3)); |
| PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty(); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDoEmptyWithTaggedOutput() { |
| TupleTag<String> mainOutputTag = new TupleTag<String>("main") {}; |
| TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1") {}; |
| TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2") {}; |
| TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3") {}; |
| TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput") {}; |
| |
| PCollectionTuple outputs = |
| pipeline |
| .apply(Create.empty(VarIntCoder.of())) |
| .apply( |
| ParDo.of( |
| new TestDoFn( |
| Arrays.asList(), |
| Arrays.asList( |
| additionalOutputTag1, |
| additionalOutputTag2, |
| additionalOutputTag3))) |
| .withOutputTags( |
| mainOutputTag, |
| TupleTagList.of(additionalOutputTag3) |
| .and(additionalOutputTag1) |
| .and(additionalOutputTagUnwritten) |
| .and(additionalOutputTag2))); |
| |
| List<Integer> inputs = Collections.emptyList(); |
| PAssert.that(outputs.get(mainOutputTag)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); |
| |
| PAssert.that(outputs.get(additionalOutputTag1)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag1)); |
| PAssert.that(outputs.get(additionalOutputTag2)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag2)); |
| PAssert.that(outputs.get(additionalOutputTag3)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag3)); |
| PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty(); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDoWithEmptyTaggedOutput() { |
| TupleTag<String> mainOutputTag = new TupleTag<String>("main") {}; |
| TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1") {}; |
| TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2") {}; |
| |
| PCollectionTuple outputs = |
| pipeline |
| .apply(Create.empty(VarIntCoder.of())) |
| .apply( |
| ParDo.of(new TestNoOutputDoFn()) |
| .withOutputTags( |
| mainOutputTag, |
| TupleTagList.of(additionalOutputTag1).and(additionalOutputTag2))); |
| |
| PAssert.that(outputs.get(mainOutputTag)).empty(); |
| |
| PAssert.that(outputs.get(additionalOutputTag1)).empty(); |
| PAssert.that(outputs.get(additionalOutputTag2)).empty(); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDoWithOnlyTaggedOutput() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| final TupleTag<Void> mainOutputTag = new TupleTag<Void>("main") {}; |
| final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additional") {}; |
| |
| PCollectionTuple outputs = |
| pipeline |
| .apply(Create.of(inputs)) |
| .apply( |
| ParDo.of( |
| new DoFn<Integer, Void>() { |
| @ProcessElement |
| public void processElement( |
| @Element Integer element, MultiOutputReceiver r) { |
| r.get(additionalOutputTag).output(element); |
| } |
| }) |
| .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); |
| |
| PAssert.that(outputs.get(mainOutputTag)).empty(); |
| PAssert.that(outputs.get(additionalOutputTag)).containsInAnyOrder(inputs); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testParDoWritingToUndeclaredTag() { |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| TupleTag<String> notOutputTag = new TupleTag<String>("additional") {}; |
| |
| pipeline |
| .apply(Create.of(inputs)) |
| .apply( |
| ParDo.of(new TestDoFn(Arrays.asList(), Arrays.asList(notOutputTag))) |
| // No call to .withOutputTags - should cause error |
| ); |
| |
| thrown.expectMessage("additional"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesSideInputs.class}) |
| public void testParDoWithSideInputs() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| PCollectionView<Integer> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(11)) |
| .apply("ViewSideInput1", View.asSingleton()); |
| PCollectionView<Integer> sideInputUnread = |
| pipeline |
| .apply("CreateSideInputUnread", Create.of(-3333)) |
| .apply("ViewSideInputUnread", View.asSingleton()); |
| PCollectionView<Integer> sideInput2 = |
| pipeline |
| .apply("CreateSideInput2", Create.of(222)) |
| .apply("ViewSideInput2", View.asSingleton()); |
| |
| PCollection<String> output = |
| pipeline |
| .apply(Create.of(inputs)) |
| .apply( |
| ParDo.of(new TestDoFn(Arrays.asList(sideInput1, sideInput2), Arrays.asList())) |
| .withSideInputs(sideInput1, sideInputUnread, sideInput2)); |
| |
| PAssert.that(output) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).andSideInputs(11, 222)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesSideInputs.class}) |
| public void testSideInputAnnotationFailedValidationMissing() { |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| |
| DoFn<Integer, List<Integer>> fn = |
| new DoFn<Integer, List<Integer>>() { |
| @ProcessElement |
| public void processElement(@SideInput(sideInputTag1) String tag1) {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| PCollection<List<Integer>> output = |
| pipeline.apply("Create main input", Create.of(2)).apply(ParDo.of(fn)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesSideInputs.class}) |
| public void testSideInputAnnotationFailedValidationSingletonType() { |
| |
| final PCollectionView<Integer> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(2)) |
| .apply("ViewSideInput1", View.asSingleton()); |
| |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| |
| DoFn<Integer, List<Integer>> fn = |
| new DoFn<Integer, List<Integer>>() { |
| @ProcessElement |
| public void processElement(@SideInput(sideInputTag1) String tag1) {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| PCollection<List<Integer>> output = |
| pipeline |
| .apply("Create main input", Create.of(2)) |
| .apply(ParDo.of(fn).withSideInput(sideInputTag1, sideInput1)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesSideInputs.class}) |
| public void testSideInputAnnotationFailedValidationListType() { |
| |
| final PCollectionView<List<Integer>> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(2, 1, 0)) |
| .apply("ViewSideInput1", View.asList()); |
| |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| |
| DoFn<Integer, List<Integer>> fn = |
| new DoFn<Integer, List<Integer>>() { |
| @ProcessElement |
| public void processElement(@SideInput(sideInputTag1) List<String> tag1) {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| PCollection<List<Integer>> output = |
| pipeline |
| .apply("Create main input", Create.of(2)) |
| .apply(ParDo.of(fn).withSideInput(sideInputTag1, sideInput1)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesSideInputs.class}) |
| public void testSideInputAnnotationFailedValidationIterableType() { |
| |
| final PCollectionView<Iterable<Integer>> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(2, 1, 0)) |
| .apply("ViewSideInput1", View.asIterable()); |
| |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| |
| DoFn<Integer, List<Integer>> fn = |
| new DoFn<Integer, List<Integer>>() { |
| @ProcessElement |
| public void processElement(@SideInput(sideInputTag1) List<String> tag1) {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| PCollection<List<Integer>> output = |
| pipeline |
| .apply("Create main input", Create.of(2)) |
| .apply(ParDo.of(fn).withSideInput(sideInputTag1, sideInput1)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesSideInputs.class}) |
| public void testSideInputAnnotationFailedValidationMapType() { |
| |
| final PCollectionView<Map<Integer, Integer>> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(KV.of(1, 2), KV.of(2, 3), KV.of(3, 4))) |
| .apply("ViewSideInput1", View.asMap()); |
| |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| |
| DoFn<Integer, List<Integer>> fn = |
| new DoFn<Integer, List<Integer>>() { |
| @ProcessElement |
| public void processElement(@SideInput(sideInputTag1) Map<String, String> tag1) {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| PCollection<List<Integer>> output = |
| pipeline |
| .apply("Create main input", Create.of(2)) |
| .apply(ParDo.of(fn).withSideInput(sideInputTag1, sideInput1)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesSideInputs.class}) |
| public void testSideInputAnnotationFailedValidationMultiMapType() { |
| |
| final PCollectionView<Map<Integer, Iterable<Integer>>> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(KV.of(1, 2), KV.of(1, 3), KV.of(3, 4))) |
| .apply("ViewSideInput1", View.asMultimap()); |
| |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| |
| DoFn<Integer, List<Integer>> fn = |
| new DoFn<Integer, List<Integer>>() { |
| @ProcessElement |
| public void processElement(@SideInput(sideInputTag1) Map<Integer, Integer> tag1) {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| PCollection<List<Integer>> output = |
| pipeline |
| .apply("Create main input", Create.of(2)) |
| .apply(ParDo.of(fn).withSideInput(sideInputTag1, sideInput1)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesSideInputs.class}) |
| public void testSideInputAnnotation() { |
| |
| final PCollectionView<List<Integer>> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(2, 1, 0)) |
| .apply("ViewSideInput1", View.asList()); |
| |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| |
| DoFn<Integer, List<Integer>> fn = |
| new DoFn<Integer, List<Integer>>() { |
| @ProcessElement |
| public void processElement( |
| OutputReceiver<List<Integer>> r, @SideInput(sideInputTag1) List<Integer> tag1) { |
| |
| List<Integer> sideSorted = Lists.newArrayList(tag1); |
| Collections.sort(sideSorted); |
| r.output(sideSorted); |
| } |
| }; |
| |
| PCollection<List<Integer>> output = |
| pipeline |
| .apply("Create main input", Create.of(2)) |
| .apply(ParDo.of(fn).withSideInput(sideInputTag1, sideInput1)); |
| |
| PAssert.that(output).containsInAnyOrder(Lists.newArrayList(0, 1, 2)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesSideInputs.class, |
| UsesSideInputsWithDifferentCoders.class |
| }) |
| public void testSideInputAnnotationWithMultipleSideInputs() { |
| |
| final List<Integer> side1Data = ImmutableList.of(2, 0); |
| final PCollectionView<List<Integer>> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(side1Data)) |
| .apply("ViewSideInput1", View.asList()); |
| |
| final Integer side2Data = 5; |
| final PCollectionView<Integer> sideInput2 = |
| pipeline |
| .apply("CreateSideInput2", Create.of(side2Data)) |
| .apply("ViewSideInput2", View.asSingleton()); |
| |
| final List<Integer> side3Data = ImmutableList.of(1, 3); |
| final PCollectionView<Iterable<Integer>> sideInput3 = |
| pipeline |
| .apply("CreateSideInput3", Create.of(side3Data)) |
| .apply("ViewSideInput3", View.asIterable()); |
| |
| final List<KV<Integer, Integer>> side4Data = |
| ImmutableList.of(KV.of(1, 2), KV.of(2, 3), KV.of(3, 4)); |
| final PCollectionView<Map<Integer, Integer>> sideInput4 = |
| pipeline |
| .apply("CreateSideInput4", Create.of(side4Data)) |
| .apply("ViewSideInput4", View.asMap()); |
| |
| final List<KV<Integer, Integer>> side5Data = |
| ImmutableList.of(KV.of(1, 2), KV.of(1, 3), KV.of(3, 4)); |
| final PCollectionView<Map<Integer, Iterable<Integer>>> sideInput5 = |
| pipeline |
| .apply("CreateSideInput5", Create.of(side5Data)) |
| .apply("ViewSideInput5", View.asMultimap()); |
| |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| final String sideInputTag2 = "tag2"; |
| final String sideInputTag3 = "tag3"; |
| final String sideInputTag4 = "tag4"; |
| final String sideInputTag5 = "tag5"; |
| |
| final TupleTag<Integer> outputTag1 = new TupleTag<>(); |
| final TupleTag<Integer> outputTag2 = new TupleTag<>(); |
| final TupleTag<Integer> outputTag3 = new TupleTag<>(); |
| final TupleTag<KV<Integer, Integer>> outputTag4 = new TupleTag<>(); |
| final TupleTag<KV<Integer, Integer>> outputTag5 = new TupleTag<>(); |
| |
| DoFn<Integer, Integer> fn = |
| new DoFn<Integer, Integer>() { |
| @ProcessElement |
| public void processElement( |
| MultiOutputReceiver r, |
| @SideInput(sideInputTag1) List<Integer> side1, |
| @SideInput(sideInputTag2) Integer side2, |
| @SideInput(sideInputTag3) Iterable<Integer> side3, |
| @SideInput(sideInputTag4) Map<Integer, Integer> side4, |
| @SideInput(sideInputTag5) Map<Integer, Iterable<Integer>> side5) { |
| side1.forEach(i -> r.get(outputTag1).output(i)); |
| r.get(outputTag2).output(side2); |
| side3.forEach(i -> r.get(outputTag3).output(i)); |
| side4.forEach((k, v) -> r.get(outputTag4).output(KV.of(k, v))); |
| side5.forEach((k, v) -> v.forEach(v2 -> r.get(outputTag5).output(KV.of(k, v2)))); |
| } |
| }; |
| |
| PCollectionTuple output = |
| pipeline |
| .apply("Create main input", Create.of(2)) |
| .apply( |
| ParDo.of(fn) |
| .withSideInput(sideInputTag1, sideInput1) |
| .withSideInput(sideInputTag2, sideInput2) |
| .withSideInput(sideInputTag3, sideInput3) |
| .withSideInput(sideInputTag4, sideInput4) |
| .withSideInput(sideInputTag5, sideInput5) |
| .withOutputTags( |
| outputTag1, |
| TupleTagList.of(outputTag2) |
| .and(outputTag3) |
| .and(outputTag4) |
| .and(outputTag5))); |
| |
| output.get(outputTag1).setCoder(VarIntCoder.of()); |
| output.get(outputTag2).setCoder(VarIntCoder.of()); |
| output.get(outputTag3).setCoder(VarIntCoder.of()); |
| output.get(outputTag4).setCoder(KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); |
| output.get(outputTag5).setCoder(KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); |
| |
| PAssert.that(output.get(outputTag1)).containsInAnyOrder(side1Data); |
| PAssert.that(output.get(outputTag2)).containsInAnyOrder(side2Data); |
| PAssert.that(output.get(outputTag3)).containsInAnyOrder(side3Data); |
| PAssert.that(output.get(outputTag4)).containsInAnyOrder(side4Data); |
| PAssert.that(output.get(outputTag5)).containsInAnyOrder(side5Data); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesSideInputs.class}) |
| public void testParDoWithSideInputsIsCumulative() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| PCollectionView<Integer> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(11)) |
| .apply("ViewSideInput1", View.asSingleton()); |
| PCollectionView<Integer> sideInputUnread = |
| pipeline |
| .apply("CreateSideInputUnread", Create.of(-3333)) |
| .apply("ViewSideInputUnread", View.asSingleton()); |
| PCollectionView<Integer> sideInput2 = |
| pipeline |
| .apply("CreateSideInput2", Create.of(222)) |
| .apply("ViewSideInput2", View.asSingleton()); |
| |
| PCollection<String> output = |
| pipeline |
| .apply(Create.of(inputs)) |
| .apply( |
| ParDo.of(new TestDoFn(Arrays.asList(sideInput1, sideInput2), Arrays.asList())) |
| .withSideInputs(sideInput1) |
| .withSideInputs(sideInputUnread) |
| .withSideInputs(sideInput2)); |
| |
| PAssert.that(output) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).andSideInputs(11, 222)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesSideInputs.class}) |
| public void testMultiOutputParDoWithSideInputs() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| final TupleTag<String> mainOutputTag = new TupleTag<String>("main") {}; |
| final TupleTag<Void> additionalOutputTag = new TupleTag<Void>("output") {}; |
| |
| PCollectionView<Integer> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(11)) |
| .apply("ViewSideInput1", View.asSingleton()); |
| PCollectionView<Integer> sideInputUnread = |
| pipeline |
| .apply("CreateSideInputUnread", Create.of(-3333)) |
| .apply("ViewSideInputUnread", View.asSingleton()); |
| PCollectionView<Integer> sideInput2 = |
| pipeline |
| .apply("CreateSideInput2", Create.of(222)) |
| .apply("ViewSideInput2", View.asSingleton()); |
| |
| PCollectionTuple outputs = |
| pipeline |
| .apply(Create.of(inputs)) |
| .apply( |
| ParDo.of(new TestDoFn(Arrays.asList(sideInput1, sideInput2), Arrays.asList())) |
| .withSideInputs(sideInput1) |
| .withSideInputs(sideInputUnread) |
| .withSideInputs(sideInput2) |
| .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); |
| |
| PAssert.that(outputs.get(mainOutputTag)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).andSideInputs(11, 222)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesSideInputs.class}) |
| public void testMultiOutputParDoWithSideInputsIsCumulative() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| final TupleTag<String> mainOutputTag = new TupleTag<String>("main") {}; |
| final TupleTag<Void> additionalOutputTag = new TupleTag<Void>("output") {}; |
| |
| PCollectionView<Integer> sideInput1 = |
| pipeline |
| .apply("CreateSideInput1", Create.of(11)) |
| .apply("ViewSideInput1", View.asSingleton()); |
| PCollectionView<Integer> sideInputUnread = |
| pipeline |
| .apply("CreateSideInputUnread", Create.of(-3333)) |
| .apply("ViewSideInputUnread", View.asSingleton()); |
| PCollectionView<Integer> sideInput2 = |
| pipeline |
| .apply("CreateSideInput2", Create.of(222)) |
| .apply("ViewSideInput2", View.asSingleton()); |
| |
| PCollectionTuple outputs = |
| pipeline |
| .apply(Create.of(inputs)) |
| .apply( |
| ParDo.of(new TestDoFn(Arrays.asList(sideInput1, sideInput2), Arrays.asList())) |
| .withSideInputs(sideInput1) |
| .withSideInputs(sideInputUnread) |
| .withSideInputs(sideInput2) |
| .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); |
| |
| PAssert.that(outputs.get(mainOutputTag)) |
| .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs).andSideInputs(11, 222)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testParDoReadingFromUnknownSideInput() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| PCollectionView<Integer> sideView = |
| pipeline.apply("Create3", Create.of(3)).apply(View.asSingleton()); |
| |
| pipeline |
| .apply("CreateMain", Create.of(inputs)) |
| .apply(ParDo.of(new TestDoFn(Arrays.asList(sideView), Arrays.asList()))); |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("calling sideInput() with unknown view"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesSideInputs.class}) |
| public void testSideInputsWithMultipleWindows() { |
| // Tests that the runner can safely run a DoFn that uses side inputs |
| // on an input where the element is in multiple windows. The complication is |
| // that side inputs are per-window, so the runner has to make sure |
| // to process each window individually. |
| |
| MutableDateTime mutableNow = Instant.now().toMutableDateTime(); |
| mutableNow.setMillisOfSecond(0); |
| Instant now = mutableNow.toInstant(); |
| |
| SlidingWindows windowFn = |
| SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); |
| PCollectionView<Integer> view = pipeline.apply(Create.of(1)).apply(View.asSingleton()); |
| PCollection<String> res = |
| pipeline |
| .apply(Create.timestamped(TimestampedValue.of("a", now))) |
| .apply(Window.into(windowFn)) |
| .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view)); |
| |
| for (int i = 0; i < 4; ++i) { |
| Instant base = now.minus(Duration.standardSeconds(i)); |
| IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5))); |
| PAssert.that(res).inWindow(window).containsInAnyOrder("a:1"); |
| } |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testParDoOutputNameBasedOnDoFnWithTrimmedSuffix() { |
| pipeline.enableAbandonedNodeEnforcement(false); |
| |
| PCollection<String> output = pipeline.apply(Create.of(1)).apply(ParDo.of(new TestDoFn())); |
| assertThat(output.getName(), containsString("ParDo(Test)")); |
| } |
| |
| @Test |
| public void testParDoOutputNameBasedOnLabel() { |
| pipeline.enableAbandonedNodeEnforcement(false); |
| |
| PCollection<String> output = |
| pipeline.apply(Create.of(1)).apply("MyParDo", ParDo.of(new TestDoFn())); |
| assertThat(output.getName(), containsString("MyParDo")); |
| } |
| |
| @Test |
| public void testParDoOutputNameBasedDoFnWithoutMatchingSuffix() { |
| pipeline.enableAbandonedNodeEnforcement(false); |
| |
| PCollection<String> output = |
| pipeline.apply(Create.of(1)).apply(ParDo.of(new StrangelyNamedDoer())); |
| assertThat(output.getName(), containsString("ParDo(StrangelyNamedDoer)")); |
| } |
| |
| @Test |
| public void testParDoMultiNameBasedDoFnWithTrimmerSuffix() { |
| assertThat( |
| ParDo.of(new TaggedOutputDummyFn(null, null)).withOutputTags(null, null).getName(), |
| containsString("ParMultiDo(TaggedOutputDummy)")); |
| } |
| |
| @Test |
| public void testParDoWithTaggedOutputName() { |
| pipeline.enableAbandonedNodeEnforcement(false); |
| |
| TupleTag<String> mainOutputTag = new TupleTag<String>("main") {}; |
| TupleTag<String> additionalOutputTag1 = new TupleTag<String>("output1") {}; |
| TupleTag<String> additionalOutputTag2 = new TupleTag<String>("output2") {}; |
| TupleTag<String> additionalOutputTag3 = new TupleTag<String>("output3") {}; |
| TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput") {}; |
| |
| PCollectionTuple outputs = |
| pipeline |
| .apply(Create.of(Arrays.asList(3, -42, 666))) |
| .setName("MyInput") |
| .apply( |
| "MyParDo", |
| ParDo.of( |
| new TestDoFn( |
| Arrays.asList(), |
| Arrays.asList( |
| additionalOutputTag1, |
| additionalOutputTag2, |
| additionalOutputTag3))) |
| .withOutputTags( |
| mainOutputTag, |
| TupleTagList.of(additionalOutputTag3) |
| .and(additionalOutputTag1) |
| .and(additionalOutputTagUnwritten) |
| .and(additionalOutputTag2))); |
| |
| assertEquals("MyParDo.main", outputs.get(mainOutputTag).getName()); |
| assertEquals("MyParDo.output1", outputs.get(additionalOutputTag1).getName()); |
| assertEquals("MyParDo.output2", outputs.get(additionalOutputTag2).getName()); |
| assertEquals("MyParDo.output3", outputs.get(additionalOutputTag3).getName()); |
| assertEquals("MyParDo.unwrittenOutput", outputs.get(additionalOutputTagUnwritten).getName()); |
| } |
| |
| @Test |
| public void testMultiOutputAppliedMultipleTimesDifferentOutputs() { |
| pipeline.enableAbandonedNodeEnforcement(false); |
| PCollection<Long> longs = pipeline.apply(GenerateSequence.from(0)); |
| |
| TupleTag<Long> mainOut = new TupleTag<>(); |
| final TupleTag<String> valueAsString = new TupleTag<>(); |
| final TupleTag<Integer> valueAsInt = new TupleTag<>(); |
| DoFn<Long, Long> fn = |
| new DoFn<Long, Long>() { |
| @ProcessElement |
| public void processElement(ProcessContext cxt, @Element Long element) { |
| cxt.output(cxt.element()); |
| cxt.output(valueAsString, Long.toString(cxt.element())); |
| cxt.output(valueAsInt, element.intValue()); |
| } |
| }; |
| |
| ParDo.MultiOutput<Long, Long> parDo = |
| ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(valueAsString).and(valueAsInt)); |
| PCollectionTuple firstApplication = longs.apply("first", parDo); |
| PCollectionTuple secondApplication = longs.apply("second", parDo); |
| assertThat(firstApplication, not(equalTo(secondApplication))); |
| assertThat( |
| firstApplication.getAll().keySet(), |
| Matchers.containsInAnyOrder(mainOut, valueAsString, valueAsInt)); |
| assertThat( |
| secondApplication.getAll().keySet(), |
| Matchers.containsInAnyOrder(mainOut, valueAsString, valueAsInt)); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testMultiOutputChaining() { |
| |
| PCollectionTuple filters = |
| pipeline.apply(Create.of(Arrays.asList(3, 4, 5, 6))).apply(new MultiFilter()); |
| PCollection<Integer> by2 = filters.get(MultiFilter.BY2); |
| PCollection<Integer> by3 = filters.get(MultiFilter.BY3); |
| |
| // Apply additional filters to each operation. |
| PCollection<Integer> by2then3 = |
| by2.apply("Filter3sAgain", ParDo.of(new MultiFilter.FilterFn(3))); |
| PCollection<Integer> by3then2 = |
| by3.apply("Filter2sAgain", ParDo.of(new MultiFilter.FilterFn(2))); |
| |
| PAssert.that(by2then3).containsInAnyOrder(6); |
| PAssert.that(by3then2).containsInAnyOrder(6); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testTaggedOutputUnknownCoder() throws Exception { |
| |
| PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(1, 2, 3))); |
| |
| final TupleTag<Integer> mainOutputTag = new TupleTag<>("main"); |
| final TupleTag<TestDummy> additionalOutputTag = new TupleTag<>("unknownSide"); |
| input.apply( |
| ParDo.of(new TaggedOutputDummyFn(mainOutputTag, additionalOutputTag)) |
| .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); |
| |
| thrown.expect(IllegalStateException.class); |
| thrown.expectMessage("Unable to return a default Coder"); |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testTaggedOutputUnregisteredExplicitCoder() throws Exception { |
| pipeline.enableAbandonedNodeEnforcement(false); |
| |
| PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(1, 2, 3))); |
| |
| final TupleTag<Integer> mainOutputTag = new TupleTag<>("main"); |
| final TupleTag<TestDummy> additionalOutputTag = new TupleTag<>("unregisteredSide"); |
| ParDo.MultiOutput<Integer, Integer> pardo = |
| ParDo.of(new TaggedOutputDummyFn(mainOutputTag, additionalOutputTag)) |
| .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)); |
| PCollectionTuple outputTuple = input.apply(pardo); |
| |
| outputTuple.get(additionalOutputTag).setCoder(new TestDummyCoder()); |
| |
| outputTuple.get(additionalOutputTag).apply(View.asSingleton()); |
| |
| assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder()); |
| outputTuple |
| .get(additionalOutputTag) |
| .finishSpecifyingOutput("ParDo", input, pardo); // Check for crashes |
| assertEquals( |
| new TestDummyCoder(), |
| outputTuple.get(additionalOutputTag).getCoder()); // Check for corruption |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testMainOutputUnregisteredExplicitCoder() { |
| |
| PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(1, 2, 3))); |
| |
| final TupleTag<TestDummy> mainOutputTag = new TupleTag<>("unregisteredMain"); |
| final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additionalOutput") {}; |
| PCollectionTuple outputTuple = |
| input.apply( |
| ParDo.of(new MainOutputDummyFn(mainOutputTag, additionalOutputTag)) |
| .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); |
| |
| outputTuple.get(mainOutputTag).setCoder(new TestDummyCoder()); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testMainOutputApplyTaggedOutputNoCoder() { |
| // Regression test: applying a transform to the main output |
| // should not cause a crash based on lack of a coder for the |
| // additional output. |
| |
| final TupleTag<TestDummy> mainOutputTag = new TupleTag<>("main"); |
| final TupleTag<TestDummy> additionalOutputTag = new TupleTag<>("additionalOutput"); |
| PCollectionTuple tuple = |
| pipeline |
| .apply(Create.of(new TestDummy()).withCoder(TestDummyCoder.of())) |
| .apply( |
| ParDo.of( |
| new DoFn<TestDummy, TestDummy>() { |
| @ProcessElement |
| public void processElement( |
| ProcessContext context, @Element TestDummy element) { |
| context.output(element); |
| context.output(additionalOutputTag, element); |
| } |
| }) |
| .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); |
| |
| // Before fix, tuple.get(mainOutputTag).apply(...) would indirectly trigger |
| // tuple.get(additionalOutputTag).finishSpecifyingOutput(), which would crash |
| // on a missing coder. |
| tuple |
| .get(mainOutputTag) |
| .setCoder(TestDummyCoder.of()) |
| .apply( |
| "Output1", |
| ParDo.of( |
| new DoFn<TestDummy, Integer>() { |
| @ProcessElement |
| public void processElement(ProcessContext context) { |
| context.output(1); |
| } |
| })); |
| |
| tuple.get(additionalOutputTag).setCoder(TestDummyCoder.of()); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testWithOutputTagsDisplayData() { |
| DoFn<String, String> fn = |
| new DoFn<String, String>() { |
| @ProcessElement |
| public void proccessElement(ProcessContext c) {} |
| |
| @Override |
| public void populateDisplayData(Builder builder) { |
| builder.add(DisplayData.item("fnMetadata", "foobar")); |
| } |
| }; |
| |
| ParDo.MultiOutput<String, String> parDo = |
| ParDo.of(fn).withOutputTags(new TupleTag<>(), TupleTagList.empty()); |
| |
| DisplayData displayData = DisplayData.from(parDo); |
| assertThat(displayData, includesDisplayDataFor("fn", fn)); |
| assertThat(displayData, hasDisplayItem("fn", fn.getClass())); |
| } |
| } |
| |
| /** Tests for ParDo lifecycle methods. */ |
| @RunWith(JUnit4.class) |
| public static class LifecycleTests extends SharedTestBase implements Serializable { |
| @Test |
| @Category(NeedsRunner.class) |
| public void testParDoWithErrorInStartBatch() { |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| pipeline.apply(Create.of(inputs)).apply(ParDo.of(new TestStartBatchErrorDoFn())); |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("test error in initialize"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testParDoWithErrorInProcessElement() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| pipeline.apply(Create.of(inputs)).apply(ParDo.of(new TestProcessElementErrorDoFn())); |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("test error in process"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testParDoWithErrorInFinishBatch() { |
| |
| List<Integer> inputs = Arrays.asList(3, -42, 666); |
| |
| pipeline.apply(Create.of(inputs)).apply(ParDo.of(new TestFinishBatchErrorDoFn())); |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("test error in finalize"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testWindowingInStartAndFinishBundle() { |
| |
| final FixedWindows windowFn = FixedWindows.of(Duration.millis(1)); |
| PCollection<String> output = |
| pipeline |
| .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1)))) |
| .apply(Window.into(windowFn)) |
| .apply( |
| ParDo.of( |
| new DoFn<String, String>() { |
| @ProcessElement |
| public void processElement( |
| @Element String element, |
| @Timestamp Instant timestamp, |
| OutputReceiver<String> r) { |
| r.output(element); |
| System.out.println("Process: " + element + ":" + timestamp.getMillis()); |
| } |
| |
| @FinishBundle |
| public void finishBundle(FinishBundleContext c) { |
| Instant ts = new Instant(3); |
| c.output("finish", ts, windowFn.assignWindow(ts)); |
| System.out.println("Finish: 3"); |
| } |
| })) |
| .apply(ParDo.of(new PrintingDoFn())); |
| |
| PAssert.that(output).satisfies(new Checker()); |
| |
| pipeline.run(); |
| } |
| } |
| |
| /** Tests to validate output timestamps. */ |
| @RunWith(JUnit4.class) |
| public static class TimestampTests extends SharedTestBase implements Serializable { |
| @Test |
| @Category(NeedsRunner.class) |
| public void testParDoOutputWithTimestamp() { |
| |
| PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3, 42, 6))); |
| |
| PCollection<String> output = |
| input |
| .apply(ParDo.of(new TestOutputTimestampDoFn<>())) |
| .apply(ParDo.of(new TestShiftTimestampDoFn<>(Duration.ZERO, Duration.ZERO))) |
| .apply(ParDo.of(new TestFormatTimestampDoFn<>())); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| "processing: 3, timestamp: 3", |
| "processing: 42, timestamp: 42", |
| "processing: 6, timestamp: 6"); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testParDoTaggedOutputWithTimestamp() { |
| |
| PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3, 42, 6))); |
| |
| final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main") {}; |
| final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additional") {}; |
| |
| PCollection<String> output = |
| input |
| .apply( |
| ParDo.of( |
| new DoFn<Integer, Integer>() { |
| @ProcessElement |
| public void processElement( |
| @Element Integer element, MultiOutputReceiver r) { |
| r.get(additionalOutputTag) |
| .outputWithTimestamp(element, new Instant(element.longValue())); |
| } |
| }) |
| .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))) |
| .get(additionalOutputTag) |
| .apply(ParDo.of(new TestShiftTimestampDoFn<>(Duration.ZERO, Duration.ZERO))) |
| .apply(ParDo.of(new TestFormatTimestampDoFn<>())); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| "processing: 3, timestamp: 3", |
| "processing: 42, timestamp: 42", |
| "processing: 6, timestamp: 6"); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDoShiftTimestamp() { |
| |
| PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3, 42, 6))); |
| |
| PCollection<String> output = |
| input |
| .apply(ParDo.of(new TestOutputTimestampDoFn<>())) |
| .apply( |
| ParDo.of( |
| new TestShiftTimestampDoFn<>(Duration.millis(1000), Duration.millis(-1000)))) |
| .apply(ParDo.of(new TestFormatTimestampDoFn<>())); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| "processing: 3, timestamp: -997", |
| "processing: 42, timestamp: -958", |
| "processing: 6, timestamp: -994"); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testParDoShiftTimestampInvalid() { |
| |
| pipeline |
| .apply(Create.of(Arrays.asList(3, 42, 6))) |
| .apply(ParDo.of(new TestOutputTimestampDoFn<>())) |
| .apply( |
| ParDo.of( |
| new TestShiftTimestampDoFn<>( |
| Duration.millis(1000), // allowed skew = 1 second |
| Duration.millis(-1001)))) |
| .apply(ParDo.of(new TestFormatTimestampDoFn<>())); |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("Cannot output with timestamp"); |
| thrown.expectMessage( |
| "Output timestamps must be no earlier than the timestamp of the current input"); |
| thrown.expectMessage("minus the allowed skew (1 second)."); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(NeedsRunner.class) |
| public void testParDoShiftTimestampInvalidZeroAllowed() { |
| pipeline |
| .apply(Create.of(Arrays.asList(3, 42, 6))) |
| .apply(ParDo.of(new TestOutputTimestampDoFn<>())) |
| .apply(ParDo.of(new TestShiftTimestampDoFn<>(Duration.ZERO, Duration.millis(-1001)))) |
| .apply(ParDo.of(new TestFormatTimestampDoFn<>())); |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("Cannot output with timestamp"); |
| thrown.expectMessage( |
| "Output timestamps must be no earlier than the timestamp of the current input"); |
| thrown.expectMessage("minus the allowed skew (0 milliseconds)."); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testParDoShiftTimestampUnlimited() { |
| PCollection<Long> outputs = |
| pipeline |
| .apply( |
| Create.of( |
| Arrays.asList( |
| 0L, |
| BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(), |
| GlobalWindow.INSTANCE.maxTimestamp().getMillis()))) |
| .apply("AssignTimestampToValue", ParDo.of(new TestOutputTimestampDoFn<>())) |
| .apply( |
| "ReassignToMinimumTimestamp", |
| ParDo.of( |
| new DoFn<Long, Long>() { |
| @ProcessElement |
| public void reassignTimestamps( |
| ProcessContext context, @Element Long element) { |
| // Shift the latest element as far backwards in time as the model permits |
| context.outputWithTimestamp(element, BoundedWindow.TIMESTAMP_MIN_VALUE); |
| } |
| |
| @Override |
| public Duration getAllowedTimestampSkew() { |
| return Duration.millis(Long.MAX_VALUE); |
| } |
| })); |
| |
| PAssert.that(outputs) |
| .satisfies( |
| input -> { |
| // This element is not shifted backwards in time. It must be present in the output. |
| assertThat(input, hasItem(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())); |
| for (Long elem : input) { |
| // Sanity check the outputs. 0L and the end of the global window are shifted |
| // backwards in time and theoretically could be dropped. |
| assertThat( |
| elem, |
| anyOf( |
| equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()), |
| equalTo(GlobalWindow.INSTANCE.maxTimestamp().getMillis()), |
| equalTo(0L))); |
| } |
| return null; |
| }); |
| |
| pipeline.run(); |
| } |
| } |
| |
| /** Tests to validate ParDo state. */ |
| @RunWith(JUnit4.class) |
| public static class StateTests extends SharedTestBase implements Serializable { |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class}) |
| public void testValueStateSimple() { |
| final String stateId = "foo"; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<Integer>> intState = |
| StateSpecs.value(VarIntCoder.of()); |
| |
| @ProcessElement |
| public void processElement( |
| @StateId(stateId) ValueState<Integer> state, OutputReceiver<Integer> r) { |
| Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); |
| r.output(currentValue); |
| state.write(currentValue + 1); |
| } |
| }; |
| |
| PCollection<Integer> output = |
| pipeline |
| .apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) |
| .apply(ParDo.of(fn)); |
| |
| PAssert.that(output).containsInAnyOrder(0, 1, 2); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class}) |
| public void testValueStateDedup() { |
| final String stateId = "foo"; |
| |
| DoFn<KV<Integer, Integer>, Integer> onePerKey = |
| new DoFn<KV<Integer, Integer>, Integer>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<Integer>> seenSpec = |
| StateSpecs.value(VarIntCoder.of()); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<Integer, Integer> element, |
| @StateId(stateId) ValueState<Integer> seenState, |
| OutputReceiver<Integer> r) { |
| Integer seen = MoreObjects.firstNonNull(seenState.read(), 0); |
| |
| if (seen == 0) { |
| seenState.write(seen + 1); |
| r.output(element.getValue()); |
| } |
| } |
| }; |
| |
| int numKeys = 50; |
| // A big enough list that we can see some deduping |
| List<KV<Integer, Integer>> input = new ArrayList<>(); |
| |
| // The output should have no dupes |
| Set<Integer> expectedOutput = new HashSet<>(); |
| |
| for (int key = 0; key < numKeys; ++key) { |
| int output = 1000 + key; |
| expectedOutput.add(output); |
| |
| for (int i = 0; i < 15; ++i) { |
| input.add(KV.of(key, output)); |
| } |
| } |
| |
| Collections.shuffle(input); |
| |
| PCollection<Integer> output = pipeline.apply(Create.of(input)).apply(ParDo.of(onePerKey)); |
| |
| PAssert.that(output).containsInAnyOrder(expectedOutput); |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testStateNotKeyed() { |
| final String stateId = "foo"; |
| |
| DoFn<String, Integer> fn = |
| new DoFn<String, Integer>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext c, @StateId(stateId) ValueState<Integer> state) {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("state"); |
| thrown.expectMessage("KvCoder"); |
| |
| pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn)); |
| } |
| |
| @Test |
| public void testStateNotDeterministic() { |
| final String stateId = "foo"; |
| |
| // DoubleCoder is not deterministic, so this should crash |
| DoFn<KV<Double, String>, Integer> fn = |
| new DoFn<KV<Double, String>, Integer>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext c, @StateId(stateId) ValueState<Integer> state) {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("state"); |
| thrown.expectMessage("deterministic"); |
| |
| pipeline |
| .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again"))) |
| .apply(ParDo.of(fn)); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesStatefulParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testCoderInferenceOfList() { |
| final String stateId = "foo"; |
| MyIntegerCoder myIntegerCoder = MyIntegerCoder.of(); |
| pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder); |
| |
| DoFn<KV<String, Integer>, List<MyInteger>> fn = |
| new DoFn<KV<String, Integer>, List<MyInteger>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<List<MyInteger>>> intState = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, Integer> element, |
| @StateId(stateId) ValueState<List<MyInteger>> state, |
| OutputReceiver<List<MyInteger>> r) { |
| MyInteger myInteger = new MyInteger(element.getValue()); |
| List<MyInteger> currentValue = state.read(); |
| List<MyInteger> newValue = |
| currentValue != null |
| ? ImmutableList.<MyInteger>builder() |
| .addAll(currentValue) |
| .add(myInteger) |
| .build() |
| : Collections.singletonList(myInteger); |
| r.output(newValue); |
| state.write(newValue); |
| } |
| }; |
| |
| pipeline |
| .apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) |
| .apply(ParDo.of(fn)) |
| .setCoder(ListCoder.of(myIntegerCoder)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesStatefulParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testValueStateFixedWindows() { |
| final String stateId = "foo"; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<Integer>> intState = |
| StateSpecs.value(VarIntCoder.of()); |
| |
| @ProcessElement |
| public void processElement( |
| @StateId(stateId) ValueState<Integer> state, OutputReceiver<Integer> r) { |
| Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); |
| r.output(currentValue); |
| state.write(currentValue + 1); |
| } |
| }; |
| |
| IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); |
| IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); |
| |
| PCollection<Integer> output = |
| pipeline |
| .apply( |
| Create.timestamped( |
| // first window |
| TimestampedValue.of(KV.of("hello", 7), new Instant(1)), |
| TimestampedValue.of(KV.of("hello", 14), new Instant(2)), |
| TimestampedValue.of(KV.of("hello", 21), new Instant(3)), |
| |
| // second window |
| TimestampedValue.of(KV.of("hello", 28), new Instant(11)), |
| TimestampedValue.of(KV.of("hello", 35), new Instant(13)))) |
| .apply(Window.into(FixedWindows.of(Duration.millis(10)))) |
| .apply("Stateful ParDo", ParDo.of(fn)); |
| |
| PAssert.that(output).inWindow(firstWindow).containsInAnyOrder(0, 1, 2); |
| PAssert.that(output).inWindow(secondWindow).containsInAnyOrder(0, 1); |
| pipeline.run(); |
| } |
| |
| /** |
| * Tests that there is no state bleeding between adjacent stateful {@link ParDo} transforms, |
| * which may (or may not) be executed in similar contexts after runner optimizations. |
| */ |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class}) |
| public void testValueStateSameId() { |
| final String stateId = "foo"; |
| |
| DoFn<KV<String, Integer>, KV<String, Integer>> fn = |
| new DoFn<KV<String, Integer>, KV<String, Integer>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<Integer>> intState = |
| StateSpecs.value(VarIntCoder.of()); |
| |
| @ProcessElement |
| public void processElement( |
| @StateId(stateId) ValueState<Integer> state, |
| OutputReceiver<KV<String, Integer>> r) { |
| Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); |
| r.output(KV.of("sizzle", currentValue)); |
| state.write(currentValue + 1); |
| } |
| }; |
| |
| DoFn<KV<String, Integer>, Integer> fn2 = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<Integer>> intState = |
| StateSpecs.value(VarIntCoder.of()); |
| |
| @ProcessElement |
| public void processElement( |
| @StateId(stateId) ValueState<Integer> state, OutputReceiver<Integer> r) { |
| Integer currentValue = MoreObjects.firstNonNull(state.read(), 13); |
| r.output(currentValue); |
| state.write(currentValue + 13); |
| } |
| }; |
| |
| PCollection<KV<String, Integer>> intermediate = |
| pipeline |
| .apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) |
| .apply("First stateful ParDo", ParDo.of(fn)); |
| |
| PCollection<Integer> output = intermediate.apply("Second stateful ParDo", ParDo.of(fn2)); |
| |
| PAssert.that(intermediate) |
| .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2)); |
| PAssert.that(output).containsInAnyOrder(13, 26, 39); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesStatefulParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testValueStateTaggedOutput() { |
| final String stateId = "foo"; |
| |
| final TupleTag<Integer> evenTag = new TupleTag<Integer>() {}; |
| final TupleTag<Integer> oddTag = new TupleTag<Integer>() {}; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<Integer>> intState = |
| StateSpecs.value(VarIntCoder.of()); |
| |
| @ProcessElement |
| public void processElement( |
| @StateId(stateId) ValueState<Integer> state, MultiOutputReceiver r) { |
| Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); |
| if (currentValue % 2 == 0) { |
| r.get(evenTag).output(currentValue); |
| } else { |
| r.get(oddTag).output(currentValue); |
| } |
| state.write(currentValue + 1); |
| } |
| }; |
| |
| PCollectionTuple output = |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", 42), |
| KV.of("hello", 97), |
| KV.of("hello", 84), |
| KV.of("goodbye", 33), |
| KV.of("hello", 859), |
| KV.of("goodbye", 83945))) |
| .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag))); |
| |
| PCollection<Integer> evens = output.get(evenTag); |
| PCollection<Integer> odds = output.get(oddTag); |
| |
| // There are 0 and 2 from "hello" and just 0 from "goodbye" |
| PAssert.that(evens).containsInAnyOrder(0, 2, 0); |
| |
| // There are 1 and 3 from "hello" and just "1" from "goodbye" |
| PAssert.that(odds).containsInAnyOrder(1, 3, 1); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesStatefulParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testBagState() { |
| final String stateId = "foo"; |
| |
| DoFn<KV<String, Integer>, List<Integer>> fn = |
| new DoFn<KV<String, Integer>, List<Integer>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<BagState<Integer>> bufferState = |
| StateSpecs.bag(VarIntCoder.of()); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, Integer> element, |
| @StateId(stateId) BagState<Integer> state, |
| OutputReceiver<List<Integer>> r) { |
| ReadableState<Boolean> isEmpty = state.isEmpty(); |
| state.add(element.getValue()); |
| assertFalse(isEmpty.read()); |
| Iterable<Integer> currentValue = state.read(); |
| if (Iterables.size(currentValue) >= 4) { |
| // Make sure that the cached Iterable doesn't change when new elements are added. |
| state.add(-1); |
| assertEquals(4, Iterables.size(currentValue)); |
| assertEquals(5, Iterables.size(state.read())); |
| |
| List<Integer> sorted = Lists.newArrayList(currentValue); |
| Collections.sort(sorted); |
| r.output(sorted); |
| } |
| } |
| }; |
| |
| PCollection<List<Integer>> output = |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", 97), |
| KV.of("hello", 42), |
| KV.of("hello", 84), |
| KV.of("hello", 12))) |
| .apply(ParDo.of(fn)); |
| |
| PAssert.that(output).containsInAnyOrder(Lists.newArrayList(12, 42, 84, 97)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSetState.class}) |
| public void testSetState() { |
| final String stateId = "foo"; |
| final String countStateId = "count"; |
| |
| DoFn<KV<String, Integer>, Set<Integer>> fn = |
| new DoFn<KV<String, Integer>, Set<Integer>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<SetState<Integer>> setState = StateSpecs.set(VarIntCoder.of()); |
| |
| @StateId(countStateId) |
| private final StateSpec<CombiningState<Integer, int[], Integer>> countState = |
| StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, Integer> element, |
| @StateId(stateId) SetState<Integer> state, |
| @StateId(countStateId) CombiningState<Integer, int[], Integer> count, |
| OutputReceiver<Set<Integer>> r) { |
| ReadableState<Boolean> isEmpty = state.isEmpty(); |
| state.add(element.getValue()); |
| assertFalse(isEmpty.read()); |
| count.add(1); |
| if (count.read() >= 4) { |
| // Make sure that the cached Iterable doesn't change when new elements are added. |
| Iterable<Integer> ints = state.read(); |
| state.add(-1); |
| assertEquals(3, Iterables.size(ints)); |
| assertEquals(4, Iterables.size(state.read())); |
| |
| Set<Integer> set = Sets.newHashSet(ints); |
| r.output(set); |
| } |
| } |
| }; |
| |
| PCollection<Set<Integer>> output = |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", 97), |
| KV.of("hello", 42), |
| KV.of("hello", 42), |
| KV.of("hello", 12))) |
| .apply(ParDo.of(fn)); |
| |
| PAssert.that(output).containsInAnyOrder(Sets.newHashSet(97, 42, 12)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class}) |
| public void testMapState() { |
| final String stateId = "foo"; |
| final String countStateId = "count"; |
| |
| DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>> fn = |
| new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<MapState<String, Integer>> mapState = |
| StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of()); |
| |
| @StateId(countStateId) |
| private final StateSpec<CombiningState<Integer, int[], Integer>> countState = |
| StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext c, |
| @Element KV<String, KV<String, Integer>> element, |
| @StateId(stateId) MapState<String, Integer> state, |
| @StateId(countStateId) CombiningState<Integer, int[], Integer> count, |
| OutputReceiver<KV<String, Integer>> r) { |
| KV<String, Integer> value = element.getValue(); |
| ReadableState<Iterable<Entry<String, Integer>>> entriesView = state.entries(); |
| state.put(value.getKey(), value.getValue()); |
| count.add(1); |
| if (count.read() >= 4) { |
| Iterable<Map.Entry<String, Integer>> iterate = state.entries().read(); |
| // Make sure that the cached Iterable doesn't change when new elements are added, |
| // but that cached ReadableState views of the state do change. |
| state.put("BadKey", -1); |
| assertEquals(3, Iterables.size(iterate)); |
| assertEquals(4, Iterables.size(entriesView.read())); |
| assertEquals(4, Iterables.size(state.entries().read())); |
| |
| for (Map.Entry<String, Integer> entry : iterate) { |
| r.output(KV.of(entry.getKey(), entry.getValue())); |
| } |
| } |
| } |
| }; |
| |
| PCollection<KV<String, Integer>> output = |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)), |
| KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12)))) |
| .apply(ParDo.of(fn)); |
| |
| PAssert.that(output).containsInAnyOrder(KV.of("a", 97), KV.of("b", 42), KV.of("c", 12)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class}) |
| public void testCombiningState() { |
| final String stateId = "foo"; |
| |
| DoFn<KV<String, Double>, String> fn = |
| new DoFn<KV<String, Double>, String>() { |
| |
| private static final double EPSILON = 0.0001; |
| |
| @StateId(stateId) |
| private final StateSpec<CombiningState<Double, CountSum<Double>, Double>> |
| combiningState = StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.of()); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext c, |
| @Element KV<String, Double> element, |
| @StateId(stateId) CombiningState<Double, CountSum<Double>, Double> state, |
| OutputReceiver<String> r) { |
| state.add(element.getValue()); |
| Double currentValue = state.read(); |
| if (Math.abs(currentValue - 0.5) < EPSILON) { |
| r.output("right on"); |
| } |
| } |
| }; |
| |
| PCollection<String> output = |
| pipeline |
| .apply(Create.of(KV.of("hello", 0.3), KV.of("hello", 0.6), KV.of("hello", 0.6))) |
| .apply(ParDo.of(fn)); |
| |
| // There should only be one moment at which the average is exactly 0.5 |
| PAssert.that(output).containsInAnyOrder("right on"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class}) |
| public void testCombiningStateParameterSuperclass() { |
| final String stateId = "foo"; |
| |
| DoFn<KV<Integer, Integer>, String> fn = |
| new DoFn<KV<Integer, Integer>, String>() { |
| private static final int EXPECTED_SUM = 8; |
| |
| @StateId(stateId) |
| private final StateSpec<CombiningState<Integer, int[], Integer>> state = |
| StateSpecs.combining(Sum.ofIntegers()); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<Integer, Integer> element, |
| @StateId(stateId) GroupingState<Integer, Integer> state, |
| OutputReceiver<String> r) { |
| state.add(element.getValue()); |
| Integer currentValue = state.read(); |
| if (currentValue == EXPECTED_SUM) { |
| r.output("right on"); |
| } |
| } |
| }; |
| |
| PCollection<String> output = |
| pipeline |
| .apply(Create.of(KV.of(123, 4), KV.of(123, 7), KV.of(123, -3))) |
| .apply(ParDo.of(fn)); |
| |
| // There should only be one moment at which the sum is exactly 8 |
| PAssert.that(output).containsInAnyOrder("right on"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSideInputs.class}) |
| public void testBagStateSideInput() { |
| |
| final PCollectionView<List<Integer>> listView = |
| pipeline.apply("Create list for side input", Create.of(2, 1, 0)).apply(View.asList()); |
| final String stateId = "foo"; |
| DoFn<KV<String, Integer>, List<Integer>> fn = |
| new DoFn<KV<String, Integer>, List<Integer>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<BagState<Integer>> bufferState = |
| StateSpecs.bag(VarIntCoder.of()); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext c, |
| @Element KV<String, Integer> element, |
| @StateId(stateId) BagState<Integer> state, |
| OutputReceiver<List<Integer>> r) { |
| state.add(element.getValue()); |
| Iterable<Integer> currentValue = state.read(); |
| if (Iterables.size(currentValue) >= 4) { |
| List<Integer> sorted = Lists.newArrayList(currentValue); |
| Collections.sort(sorted); |
| r.output(sorted); |
| |
| List<Integer> sideSorted = Lists.newArrayList(c.sideInput(listView)); |
| Collections.sort(sideSorted); |
| r.output(sideSorted); |
| } |
| } |
| }; |
| |
| PCollection<List<Integer>> output = |
| pipeline |
| .apply( |
| "Create main input", |
| Create.of( |
| KV.of("hello", 97), |
| KV.of("hello", 42), |
| KV.of("hello", 84), |
| KV.of("hello", 12))) |
| .apply(ParDo.of(fn).withSideInputs(listView)); |
| |
| PAssert.that(output) |
| .containsInAnyOrder(Lists.newArrayList(12, 42, 84, 97), Lists.newArrayList(0, 1, 2)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSideInputs.class}) |
| public void testStateSideInput() { |
| |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| |
| Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); |
| pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder); |
| final PCollectionView<Integer> sideInput = |
| pipeline |
| .apply("CreateSideInput1", Create.of(2)) |
| .apply("ViewSideInput1", View.asSingleton()); |
| |
| TestSimpleStatefulDoFn fn = new TestSimpleStatefulDoFn(sideInput); |
| pipeline |
| .apply(Create.of(KV.of(1, 2))) |
| .apply(ParDo.of(fn).withSideInput(sideInputTag1, sideInput)); |
| |
| pipeline.run(); |
| } |
| |
| private static class TestSimpleStatefulDoFn extends DoFn<KV<Integer, Integer>, Integer> { |
| |
| // SideInput tag id |
| final String sideInputTag1 = "tag1"; |
| private final PCollectionView<Integer> view; |
| |
| final String stateId = "foo"; |
| Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); |
| |
| @StateId(stateId) |
| private final StateSpec<BagState<MyInteger>> bufferState = StateSpecs.bag(); |
| |
| private TestSimpleStatefulDoFn(PCollectionView<Integer> view) { |
| this.view = view; |
| } |
| |
| @ProcessElement |
| public void processElem( |
| ProcessContext c, |
| @SideInput(sideInputTag1) Integer sideInputTag, |
| @StateId(stateId) BagState<MyInteger> state) { |
| state.add(new MyInteger(sideInputTag)); |
| c.output(sideInputTag); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| return other instanceof TestSimpleStatefulDoFn; |
| } |
| |
| @Override |
| public int hashCode() { |
| return getClass().hashCode(); |
| } |
| } |
| } |
| |
| /** Tests for state coder inference behaviors. */ |
| @RunWith(JUnit4.class) |
| public static class StateCoderInferenceTests extends SharedTestBase implements Serializable { |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesStatefulParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testBagStateCoderInference() { |
| final String stateId = "foo"; |
| Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); |
| pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder); |
| |
| DoFn<KV<String, Integer>, List<MyInteger>> fn = |
| new DoFn<KV<String, Integer>, List<MyInteger>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<BagState<MyInteger>> bufferState = StateSpecs.bag(); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, Integer> element, |
| @StateId(stateId) BagState<MyInteger> state, |
| OutputReceiver<List<MyInteger>> r) { |
| state.add(new MyInteger(element.getValue())); |
| Iterable<MyInteger> currentValue = state.read(); |
| if (Iterables.size(currentValue) >= 4) { |
| List<MyInteger> sorted = Lists.newArrayList(currentValue); |
| Collections.sort(sorted); |
| r.output(sorted); |
| } |
| } |
| }; |
| |
| PCollection<List<MyInteger>> output = |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", 97), |
| KV.of("hello", 42), |
| KV.of("hello", 84), |
| KV.of("hello", 12))) |
| .apply(ParDo.of(fn)) |
| .setCoder(ListCoder.of(myIntegerCoder)); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| Lists.newArrayList( |
| new MyInteger(12), new MyInteger(42), |
| new MyInteger(84), new MyInteger(97))); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class}) |
| public void testBagStateCoderInferenceFailure() throws Exception { |
| final String stateId = "foo"; |
| Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); |
| |
| DoFn<KV<String, Integer>, List<MyInteger>> fn = |
| new DoFn<KV<String, Integer>, List<MyInteger>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<BagState<MyInteger>> bufferState = StateSpecs.bag(); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, Integer> element, |
| @StateId(stateId) BagState<MyInteger> state, |
| OutputReceiver<List<MyInteger>> r) { |
| state.add(new MyInteger(element.getValue())); |
| Iterable<MyInteger> currentValue = state.read(); |
| if (Iterables.size(currentValue) >= 4) { |
| List<MyInteger> sorted = Lists.newArrayList(currentValue); |
| Collections.sort(sorted); |
| r.output(sorted); |
| } |
| } |
| }; |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("Unable to infer a coder for BagState and no Coder was specified."); |
| |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello", 12))) |
| .apply(ParDo.of(fn)) |
| .setCoder(ListCoder.of(myIntegerCoder)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSetState.class}) |
| public void testSetStateCoderInference() { |
| final String stateId = "foo"; |
| final String countStateId = "count"; |
| Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); |
| pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder); |
| |
| DoFn<KV<String, Integer>, Set<MyInteger>> fn = |
| new DoFn<KV<String, Integer>, Set<MyInteger>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set(); |
| |
| @StateId(countStateId) |
| private final StateSpec<CombiningState<Integer, int[], Integer>> countState = |
| StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, Integer> element, |
| @StateId(stateId) SetState<MyInteger> state, |
| @StateId(countStateId) CombiningState<Integer, int[], Integer> count, |
| OutputReceiver<Set<MyInteger>> r) { |
| state.add(new MyInteger(element.getValue())); |
| count.add(1); |
| if (count.read() >= 4) { |
| Set<MyInteger> set = Sets.newHashSet(state.read()); |
| r.output(set); |
| } |
| } |
| }; |
| |
| PCollection<Set<MyInteger>> output = |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", 97), |
| KV.of("hello", 42), |
| KV.of("hello", 42), |
| KV.of("hello", 12))) |
| .apply(ParDo.of(fn)) |
| .setCoder(SetCoder.of(myIntegerCoder)); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| Sets.newHashSet(new MyInteger(97), new MyInteger(42), new MyInteger(12))); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSetState.class}) |
| public void testSetStateCoderInferenceFailure() throws Exception { |
| final String stateId = "foo"; |
| final String countStateId = "count"; |
| Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); |
| |
| DoFn<KV<String, Integer>, Set<MyInteger>> fn = |
| new DoFn<KV<String, Integer>, Set<MyInteger>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set(); |
| |
| @StateId(countStateId) |
| private final StateSpec<CombiningState<Integer, int[], Integer>> countState = |
| StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, Integer> element, |
| @StateId(stateId) SetState<MyInteger> state, |
| @StateId(countStateId) CombiningState<Integer, int[], Integer> count, |
| OutputReceiver<Set<MyInteger>> r) { |
| state.add(new MyInteger(element.getValue())); |
| count.add(1); |
| if (count.read() >= 4) { |
| Set<MyInteger> set = Sets.newHashSet(state.read()); |
| r.output(set); |
| } |
| } |
| }; |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("Unable to infer a coder for SetState and no Coder was specified."); |
| |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 42), KV.of("hello", 12))) |
| .apply(ParDo.of(fn)) |
| .setCoder(SetCoder.of(myIntegerCoder)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class}) |
| public void testMapStateCoderInference() { |
| final String stateId = "foo"; |
| final String countStateId = "count"; |
| Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); |
| pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder); |
| |
| DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn = |
| new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map(); |
| |
| @StateId(countStateId) |
| private final StateSpec<CombiningState<Integer, int[], Integer>> countState = |
| StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, KV<String, Integer>> element, |
| @StateId(stateId) MapState<String, MyInteger> state, |
| @StateId(countStateId) CombiningState<Integer, int[], Integer> count, |
| OutputReceiver<KV<String, MyInteger>> r) { |
| KV<String, Integer> value = element.getValue(); |
| state.put(value.getKey(), new MyInteger(value.getValue())); |
| count.add(1); |
| if (count.read() >= 4) { |
| Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read(); |
| for (Map.Entry<String, MyInteger> entry : iterate) { |
| r.output(KV.of(entry.getKey(), entry.getValue())); |
| } |
| } |
| } |
| }; |
| |
| PCollection<KV<String, MyInteger>> output = |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)), |
| KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12)))) |
| .apply(ParDo.of(fn)) |
| .setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder)); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| KV.of("a", new MyInteger(97)), |
| KV.of("b", new MyInteger(42)), |
| KV.of("c", new MyInteger(12))); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class}) |
| public void testMapStateCoderInferenceFailure() throws Exception { |
| final String stateId = "foo"; |
| final String countStateId = "count"; |
| Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); |
| |
| DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn = |
| new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() { |
| |
| @StateId(stateId) |
| private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map(); |
| |
| @StateId(countStateId) |
| private final StateSpec<CombiningState<Integer, int[], Integer>> countState = |
| StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext c, |
| @Element KV<String, KV<String, Integer>> element, |
| @StateId(stateId) MapState<String, MyInteger> state, |
| @StateId(countStateId) CombiningState<Integer, int[], Integer> count, |
| OutputReceiver<KV<String, MyInteger>> r) { |
| KV<String, Integer> value = element.getValue(); |
| state.put(value.getKey(), new MyInteger(value.getValue())); |
| count.add(1); |
| if (count.read() >= 4) { |
| Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read(); |
| for (Map.Entry<String, MyInteger> entry : iterate) { |
| r.output(KV.of(entry.getKey(), entry.getValue())); |
| } |
| } |
| } |
| }; |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("Unable to infer a coder for MapState and no Coder was specified."); |
| |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)), |
| KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12)))) |
| .apply(ParDo.of(fn)) |
| .setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder)); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class}) |
| public void testCombiningStateCoderInference() { |
| pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, MyIntegerCoder.of()); |
| |
| final String stateId = "foo"; |
| |
| DoFn<KV<String, Integer>, String> fn = |
| new DoFn<KV<String, Integer>, String>() { |
| private static final int EXPECTED_SUM = 16; |
| |
| @StateId(stateId) |
| private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState = |
| StateSpecs.combining( |
| new Combine.CombineFn<Integer, MyInteger, Integer>() { |
| @Override |
| public MyInteger createAccumulator() { |
| return new MyInteger(0); |
| } |
| |
| @Override |
| public MyInteger addInput(MyInteger accumulator, Integer input) { |
| return new MyInteger(accumulator.getValue() + input); |
| } |
| |
| @Override |
| public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) { |
| int newValue = 0; |
| for (MyInteger myInteger : accumulators) { |
| newValue += myInteger.getValue(); |
| } |
| return new MyInteger(newValue); |
| } |
| |
| @Override |
| public Integer extractOutput(MyInteger accumulator) { |
| return accumulator.getValue(); |
| } |
| }); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, Integer> element, |
| @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state, |
| OutputReceiver<String> r) { |
| state.add(element.getValue()); |
| Integer currentValue = state.read(); |
| if (currentValue == EXPECTED_SUM) { |
| r.output("right on"); |
| } |
| } |
| }; |
| |
| PCollection<String> output = |
| pipeline |
| .apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7))) |
| .apply(ParDo.of(fn)); |
| |
| // There should only be one moment at which the average is exactly 16 |
| PAssert.that(output).containsInAnyOrder("right on"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class}) |
| public void testCombiningStateCoderInferenceFailure() throws Exception { |
| final String stateId = "foo"; |
| |
| DoFn<KV<String, Integer>, String> fn = |
| new DoFn<KV<String, Integer>, String>() { |
| private static final int EXPECTED_SUM = 16; |
| |
| @StateId(stateId) |
| private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState = |
| StateSpecs.combining( |
| new Combine.CombineFn<Integer, MyInteger, Integer>() { |
| @Override |
| public MyInteger createAccumulator() { |
| return new MyInteger(0); |
| } |
| |
| @Override |
| public MyInteger addInput(MyInteger accumulator, Integer input) { |
| return new MyInteger(accumulator.getValue() + input); |
| } |
| |
| @Override |
| public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) { |
| int newValue = 0; |
| for (MyInteger myInteger : accumulators) { |
| newValue += myInteger.getValue(); |
| } |
| return new MyInteger(newValue); |
| } |
| |
| @Override |
| public Integer extractOutput(MyInteger accumulator) { |
| return accumulator.getValue(); |
| } |
| }); |
| |
| @ProcessElement |
| public void processElement( |
| @Element KV<String, Integer> element, |
| @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state, |
| OutputReceiver<String> r) { |
| state.add(element.getValue()); |
| Integer currentValue = state.read(); |
| if (currentValue == EXPECTED_SUM) { |
| r.output("right on"); |
| } |
| } |
| }; |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage( |
| "Unable to infer a coder for CombiningState and no Coder was specified."); |
| |
| pipeline |
| .apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7))) |
| .apply(ParDo.of(fn)); |
| |
| pipeline.run(); |
| } |
| } |
| |
| /** Tests to validate ParDo timers. */ |
| @RunWith(JUnit4.class) |
| public static class TimerTests extends SharedTestBase implements Serializable { |
| @Test |
| public void testTimerNotKeyed() { |
| final String timerId = "foo"; |
| |
| DoFn<String, Integer> fn = |
| new DoFn<String, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement(ProcessContext c, @TimerId(timerId) Timer timer) {} |
| |
| @OnTimer(timerId) |
| public void onTimer() {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("timer"); |
| thrown.expectMessage("KvCoder"); |
| |
| pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn)); |
| } |
| |
| @Test |
| public void testTimerNotDeterministic() { |
| final String timerId = "foo"; |
| |
| // DoubleCoder is not deterministic, so this should crash |
| DoFn<KV<Double, String>, Integer> fn = |
| new DoFn<KV<Double, String>, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement(ProcessContext c, @TimerId(timerId) Timer timer) {} |
| |
| @OnTimer(timerId) |
| public void onTimer() {} |
| }; |
| |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("timer"); |
| thrown.expectMessage("deterministic"); |
| |
| pipeline |
| .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again"))) |
| .apply(ParDo.of(fn)); |
| } |
| |
| /** |
| * Tests that an event time timer fires and results in supplementary output. |
| * |
| * <p>This test relies on two properties: |
| * |
| * <ol> |
| * <li>A timer that is set on time should always get a chance to fire. For this to be true, |
| * timers per-key-and-window must be delivered in order so the timer is not wiped out |
| * until the window is expired by the runner. |
| * <li>A {@link Create} transform sends its elements on time, and later advances the watermark |
| * to infinity |
| * </ol> |
| * |
| * <p>Note that {@link TestStream} is not applicable because it requires very special runner |
| * hooks and is only supported by the direct runner. |
| */ |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testEventTimeTimerBounded() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { |
| timer.offset(Duration.standardSeconds(1)).setRelative(); |
| r.output(3); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { |
| if (timeDomain.equals(TimeDomain.EVENT_TIME)) { |
| r.output(42); |
| } |
| } |
| }; |
| |
| PCollection<Integer> output = |
| pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); |
| PAssert.that(output).containsInAnyOrder(3, 42); |
| pipeline.run(); |
| } |
| |
| /** |
| * Tests a GBK followed immediately by a {@link ParDo} that users timers. This checks a common |
| * case where both GBK and the user code share a timer delivery bundle. |
| */ |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testGbkFollowedByUserTimers() throws Exception { |
| |
| DoFn<KV<String, Iterable<Integer>>, Integer> fn = |
| new DoFn<KV<String, Iterable<Integer>>, Integer>() { |
| |
| public static final String TIMER_ID = "foo"; |
| |
| @TimerId(TIMER_ID) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement(@TimerId(TIMER_ID) Timer timer, OutputReceiver<Integer> r) { |
| timer.offset(Duration.standardSeconds(1)).setRelative(); |
| r.output(3); |
| } |
| |
| @OnTimer(TIMER_ID) |
| public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { |
| if (timeDomain.equals(TimeDomain.EVENT_TIME)) { |
| r.output(42); |
| } |
| } |
| }; |
| |
| PCollection<Integer> output = |
| pipeline |
| .apply(Create.of(KV.of("hello", 37))) |
| .apply(GroupByKey.create()) |
| .apply(ParDo.of(fn)); |
| PAssert.that(output).containsInAnyOrder(3, 42); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testEventTimeTimerAlignBounded() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, KV<Integer, Instant>> fn = |
| new DoFn<KV<String, Integer>, KV<Integer, Instant>>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement( |
| @TimerId(timerId) Timer timer, |
| @Timestamp Instant timestamp, |
| OutputReceiver<KV<Integer, Instant>> r) { |
| timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative(); |
| r.output(KV.of(3, timestamp)); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer( |
| @Timestamp Instant timestamp, OutputReceiver<KV<Integer, Instant>> r) { |
| r.output(KV.of(42, timestamp)); |
| } |
| }; |
| |
| PCollection<KV<Integer, Instant>> output = |
| pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); |
| PAssert.that(output) |
| .containsInAnyOrder( |
| KV.of(3, BoundedWindow.TIMESTAMP_MIN_VALUE), |
| KV.of(42, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1774))); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testTimerReceivedInOriginalWindow() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, BoundedWindow> fn = |
| new DoFn<KV<String, Integer>, BoundedWindow>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement(@TimerId(timerId) Timer timer) { |
| timer.offset(Duration.standardSeconds(1)).setRelative(); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer(BoundedWindow window, OutputReceiver<BoundedWindow> r) { |
| r.output(window); |
| } |
| |
| @Override |
| public TypeDescriptor<BoundedWindow> getOutputTypeDescriptor() { |
| return (TypeDescriptor) TypeDescriptor.of(IntervalWindow.class); |
| } |
| }; |
| |
| SlidingWindows windowing = |
| SlidingWindows.of(Duration.standardMinutes(3)).every(Duration.standardMinutes(1)); |
| PCollection<BoundedWindow> output = |
| pipeline |
| .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 24), new Instant(0L)))) |
| .apply(Window.into(windowing)) |
| .apply(ParDo.of(fn)); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| new IntervalWindow(new Instant(0), Duration.standardMinutes(3)), |
| new IntervalWindow( |
| new Instant(0).minus(Duration.standardMinutes(1)), Duration.standardMinutes(3)), |
| new IntervalWindow( |
| new Instant(0).minus(Duration.standardMinutes(2)), Duration.standardMinutes(3))); |
| pipeline.run(); |
| } |
| |
| /** |
| * Tests that an event time timer set absolutely for the last possible moment fires and results |
| * in supplementary output. The test is otherwise identical to {@link |
| * #testEventTimeTimerBounded()}. |
| */ |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testEventTimeTimerAbsolute() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement( |
| @TimerId(timerId) Timer timer, BoundedWindow window, OutputReceiver<Integer> r) { |
| timer.set(window.maxTimestamp()); |
| r.output(3); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer(OutputReceiver<Integer> r) { |
| r.output(42); |
| } |
| }; |
| |
| PCollection<Integer> output = |
| pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); |
| PAssert.that(output).containsInAnyOrder(3, 42); |
| pipeline.run(); |
| } |
| |
| @Ignore( |
| "https://issues.apache.org/jira/browse/BEAM-2791, " |
| + "https://issues.apache.org/jira/browse/BEAM-2535") |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class}) |
| public void testEventTimeTimerLoop() { |
| final String stateId = "count"; |
| final String timerId = "timer"; |
| final int loopCount = 5; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec loopSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<Integer>> countSpec = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| @StateId(stateId) ValueState<Integer> countState, |
| @TimerId(timerId) Timer loopTimer) { |
| loopTimer.offset(Duration.millis(1)).setRelative(); |
| } |
| |
| @OnTimer(timerId) |
| public void onLoopTimer( |
| @StateId(stateId) ValueState<Integer> countState, |
| @TimerId(timerId) Timer loopTimer, |
| OutputReceiver<Integer> r) { |
| int count = MoreObjects.firstNonNull(countState.read(), 0); |
| if (count < loopCount) { |
| r.output(count); |
| countState.write(count + 1); |
| loopTimer.offset(Duration.millis(1)).setRelative(); |
| } |
| } |
| }; |
| |
| PCollection<Integer> output = |
| pipeline.apply(Create.of(KV.of("hello", 42))).apply(ParDo.of(fn)); |
| |
| PAssert.that(output).containsInAnyOrder(0, 1, 2, 3, 4); |
| pipeline.run(); |
| } |
| |
| /** |
| * Tests that event time timers for multiple keys both fire. This particularly exercises |
| * implementations that may GC in ways not simply governed by the watermark. |
| */ |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testEventTimeTimerMultipleKeys() throws Exception { |
| final String timerId = "foo"; |
| final String stateId = "sizzle"; |
| |
| final int offset = 5000; |
| final int timerOutput = 4093; |
| |
| DoFn<KV<String, Integer>, KV<String, Integer>> fn = |
| new DoFn<KV<String, Integer>, KV<String, Integer>>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<String>> stateSpec = |
| StateSpecs.value(StringUtf8Coder.of()); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext context, |
| @TimerId(timerId) Timer timer, |
| @StateId(stateId) ValueState<String> state, |
| BoundedWindow window) { |
| timer.set(window.maxTimestamp()); |
| state.write(context.element().getKey()); |
| context.output( |
| KV.of(context.element().getKey(), context.element().getValue() + offset)); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer( |
| @StateId(stateId) ValueState<String> state, OutputReceiver<KV<String, Integer>> r) { |
| r.output(KV.of(state.read(), timerOutput)); |
| } |
| }; |
| |
| // Enough keys that we exercise interesting code paths |
| int numKeys = 50; |
| List<KV<String, Integer>> input = new ArrayList<>(); |
| List<KV<String, Integer>> expectedOutput = new ArrayList<>(); |
| |
| for (Integer key = 0; key < numKeys; ++key) { |
| // Each key should have just one final output at GC time |
| expectedOutput.add(KV.of(key.toString(), timerOutput)); |
| |
| for (int i = 0; i < 15; ++i) { |
| // Each input should be output with the offset added |
| input.add(KV.of(key.toString(), i)); |
| expectedOutput.add(KV.of(key.toString(), i + offset)); |
| } |
| } |
| |
| Collections.shuffle(input); |
| |
| PCollection<KV<String, Integer>> output = |
| pipeline.apply(Create.of(input)).apply(ParDo.of(fn)); |
| PAssert.that(output).containsInAnyOrder(expectedOutput); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testAbsoluteProcessingTimeTimerRejected() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); |
| |
| @ProcessElement |
| public void processElement(@TimerId(timerId) Timer timer) { |
| try { |
| timer.set(new Instant(0)); |
| fail("Should have failed due to processing time with absolute timer."); |
| } catch (RuntimeException e) { |
| String message = e.getMessage(); |
| List<String> expectedSubstrings = |
| Arrays.asList("relative timers", "processing time"); |
| expectedSubstrings.forEach( |
| str -> |
| Preconditions.checkState( |
| message.contains(str), |
| "Pipeline didn't fail with the expected strings: %s", |
| expectedSubstrings)); |
| } |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer() {} |
| }; |
| |
| pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testOutOfBoundsEventTimeTimer() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext context, BoundedWindow window, @TimerId(timerId) Timer timer) { |
| try { |
| timer.set(window.maxTimestamp().plus(1L)); |
| fail("Should have failed due to processing time with absolute timer."); |
| } catch (RuntimeException e) { |
| String message = e.getMessage(); |
| List<String> expectedSubstrings = Arrays.asList("event time timer", "expiration"); |
| expectedSubstrings.forEach( |
| str -> |
| Preconditions.checkState( |
| message.contains(str), |
| "Pipeline didn't fail with the expected strings: %s", |
| expectedSubstrings)); |
| } |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer() {} |
| }; |
| |
| pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStreamWithProcessingTime.class}) |
| public void testSimpleProcessingTimerTimer() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); |
| |
| @ProcessElement |
| public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { |
| timer.offset(Duration.standardSeconds(1)).setRelative(); |
| r.output(3); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { |
| if (timeDomain.equals(TimeDomain.PROCESSING_TIME)) { |
| r.output(42); |
| } |
| } |
| }; |
| |
| TestStream<KV<String, Integer>> stream = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) |
| .addElements(KV.of("hello", 37)) |
| .advanceProcessingTime(Duration.standardSeconds(2)) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); |
| PAssert.that(output).containsInAnyOrder(3, 42); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) |
| public void testEventTimeTimerUnbounded() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { |
| timer.offset(Duration.standardSeconds(1)).setRelative(); |
| r.output(3); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer(OutputReceiver<Integer> r) { |
| r.output(42); |
| } |
| }; |
| |
| TestStream<KV<String, Integer>> stream = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) |
| .advanceWatermarkTo(new Instant(0)) |
| .addElements(KV.of("hello", 37)) |
| .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1))) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); |
| PAssert.that(output).containsInAnyOrder(3, 42); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) |
| public void testEventTimeTimerAlignUnbounded() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, KV<Integer, Instant>> fn = |
| new DoFn<KV<String, Integer>, KV<Integer, Instant>>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement( |
| @TimerId(timerId) Timer timer, |
| @Timestamp Instant timestamp, |
| OutputReceiver<KV<Integer, Instant>> r) { |
| timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative(); |
| r.output(KV.of(3, timestamp)); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer( |
| @Timestamp Instant timestamp, OutputReceiver<KV<Integer, Instant>> r) { |
| r.output(KV.of(42, timestamp)); |
| } |
| }; |
| |
| TestStream<KV<String, Integer>> stream = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) |
| .advanceWatermarkTo(new Instant(5)) |
| .addElements(KV.of("hello", 37)) |
| .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1).plus(1))) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<KV<Integer, Instant>> output = pipeline.apply(stream).apply(ParDo.of(fn)); |
| PAssert.that(output) |
| .containsInAnyOrder( |
| KV.of(3, new Instant(5)), |
| KV.of(42, new Instant(Duration.standardSeconds(1).minus(1).getMillis()))); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) |
| public void testEventTimeTimerAlignAfterGcTimeUnbounded() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, KV<Integer, Instant>> fn = |
| new DoFn<KV<String, Integer>, KV<Integer, Instant>>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) { |
| // This aligned time will exceed the END_OF_GLOBAL_WINDOW |
| timer.align(Duration.standardDays(1)).setRelative(); |
| context.output(KV.of(3, context.timestamp())); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer( |
| @Timestamp Instant timestamp, OutputReceiver<KV<Integer, Instant>> r) { |
| r.output(KV.of(42, timestamp)); |
| } |
| }; |
| |
| TestStream<KV<String, Integer>> stream = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) |
| // See GlobalWindow, |
| // END_OF_GLOBAL_WINDOW is TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)) |
| .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))) |
| .addElements(KV.of("hello", 37)) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<KV<Integer, Instant>> output = pipeline.apply(stream).apply(ParDo.of(fn)); |
| PAssert.that(output) |
| .containsInAnyOrder( |
| KV.of(3, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))), |
| KV.of(42, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)))); |
| pipeline.run(); |
| } |
| |
| /** |
| * A test makes sure that a processing time timer should reset rather than creating duplicate |
| * timers when a "set" method is called on it before it goes off. |
| */ |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| UsesTestStreamWithProcessingTime.class |
| }) |
| public void testProcessingTimeTimerCanBeReset() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, String>, String> fn = |
| new DoFn<KV<String, String>, String>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); |
| |
| @ProcessElement |
| public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) { |
| timer.offset(Duration.standardSeconds(1)).setRelative(); |
| context.output(context.element().getValue()); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer(OutputReceiver<String> r) { |
| r.output("timer_output"); |
| } |
| }; |
| |
| TestStream<KV<String, String>> stream = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) |
| .addElements(KV.of("key", "input1")) |
| .addElements(KV.of("key", "input2")) |
| .advanceProcessingTime(Duration.standardSeconds(2)) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<String> output = pipeline.apply(stream).apply(ParDo.of(fn)); |
| // Timer "foo" is set twice because input1 and input 2 are outputted. However, only one |
| // "timer_output" is outputted. Therefore, the timer is overwritten. |
| PAssert.that(output).containsInAnyOrder("input1", "input2", "timer_output"); |
| pipeline.run(); |
| } |
| |
| /** |
| * A test makes sure that an event time timer should reset rather than creating duplicate timers |
| * when a "set" method is called on it before it goes off. |
| */ |
| @Test |
| @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) |
| public void testEventTimeTimerCanBeReset() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, String>, String> fn = |
| new DoFn<KV<String, String>, String>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) { |
| timer.offset(Duration.standardSeconds(1)).setRelative(); |
| context.output(context.element().getValue()); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer(OutputReceiver<String> r) { |
| r.output("timer_output"); |
| } |
| }; |
| |
| TestStream<KV<String, String>> stream = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) |
| .advanceWatermarkTo(new Instant(0)) |
| .addElements(KV.of("hello", "input1")) |
| .addElements(KV.of("hello", "input2")) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<String> output = pipeline.apply(stream).apply(ParDo.of(fn)); |
| // Timer "foo" is set twice because input1 and input 2 are outputted. However, only one |
| // "timer_output" is outputted. Therefore, the timer is overwritten. |
| PAssert.that(output).containsInAnyOrder("input1", "input2", "timer_output"); |
| pipeline.run(); |
| } |
| |
| /** A test makes sure that an event time timers are correctly ordered. */ |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| UsesTestStream.class, |
| UsesStatefulParDo.class, |
| UsesStrictTimerOrdering.class |
| }) |
| public void testEventTimeTimerOrdering() throws Exception { |
| final int numTestElements = 100; |
| final Instant now = new Instant(1500000000000L); |
| TestStream.Builder<KV<String, String>> builder = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) |
| .advanceWatermarkTo(new Instant(0)); |
| |
| for (int i = 0; i < numTestElements; i++) { |
| builder = builder.addElements(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i))); |
| builder = builder.advanceWatermarkTo(now.plus(i / 10 * 10)); |
| } |
| |
| testEventTimeTimerOrderingWithInputPTransform( |
| now, numTestElements, builder.advanceWatermarkToInfinity()); |
| } |
| |
| /** A test makes sure that an event time timers are correctly ordered using Create transform. */ |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| UsesStatefulParDo.class, |
| UsesStrictTimerOrdering.class |
| }) |
| public void testEventTimeTimerOrderingWithCreate() throws Exception { |
| final int numTestElements = 100; |
| final Instant now = new Instant(1500000000000L); |
| |
| List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>(); |
| for (int i = 0; i < numTestElements; i++) { |
| elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i))); |
| } |
| |
| testEventTimeTimerOrderingWithInputPTransform( |
| now, numTestElements, Create.timestamped(elements)); |
| } |
| |
| private void testEventTimeTimerOrderingWithInputPTransform( |
| Instant now, |
| int numTestElements, |
| PTransform<PBegin, PCollection<KV<String, String>>> transform) |
| throws Exception { |
| |
| final String timerIdBagAppend = "append"; |
| final String timerIdGc = "gc"; |
| final String bag = "bag"; |
| final String minTimestamp = "minTs"; |
| final Instant gcTimerStamp = now.plus(numTestElements + 1); |
| |
| DoFn<KV<String, String>, String> fn = |
| new DoFn<KV<String, String>, String>() { |
| |
| @TimerId(timerIdBagAppend) |
| private final TimerSpec appendSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @TimerId(timerIdGc) |
| private final TimerSpec gcSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @StateId(bag) |
| private final StateSpec<BagState<TimestampedValue<String>>> bagStateSpec = |
| StateSpecs.bag(); |
| |
| @StateId(minTimestamp) |
| private final StateSpec<ValueState<Instant>> minTimestampSpec = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext context, |
| @TimerId(timerIdBagAppend) Timer bagTimer, |
| @TimerId(timerIdGc) Timer gcTimer, |
| @StateId(bag) BagState<TimestampedValue<String>> bagState, |
| @StateId(minTimestamp) ValueState<Instant> minStampState) { |
| |
| Instant currentMinStamp = |
| MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); |
| if (currentMinStamp.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { |
| gcTimer.set(gcTimerStamp); |
| } |
| if (currentMinStamp.isAfter(context.timestamp())) { |
| minStampState.write(context.timestamp()); |
| bagTimer.set(context.timestamp()); |
| } |
| bagState.add(TimestampedValue.of(context.element().getValue(), context.timestamp())); |
| } |
| |
| @OnTimer(timerIdBagAppend) |
| public void onTimer( |
| OnTimerContext context, |
| @TimerId(timerIdBagAppend) Timer timer, |
| @StateId(bag) BagState<TimestampedValue<String>> bagState) { |
| |
| List<TimestampedValue<String>> flush = new ArrayList<>(); |
| Instant flushTime = context.timestamp(); |
| for (TimestampedValue<String> val : bagState.read()) { |
| if (!val.getTimestamp().isAfter(flushTime)) { |
| flush.add(val); |
| } |
| } |
| flush.sort(Comparator.comparing(TimestampedValue::getTimestamp)); |
| context.output( |
| Joiner.on(":").join(flush.stream().map(TimestampedValue::getValue).iterator())); |
| Instant newMinStamp = flushTime.plus(1); |
| if (flush.size() < numTestElements) { |
| timer.set(newMinStamp); |
| } |
| } |
| |
| @OnTimer(timerIdGc) |
| public void onTimer( |
| OnTimerContext context, @StateId(bag) BagState<TimestampedValue<String>> bagState) { |
| |
| String output = |
| Joiner.on(":") |
| .join( |
| StreamSupport.stream(bagState.read().spliterator(), false) |
| .sorted(Comparator.comparing(TimestampedValue::getTimestamp)) |
| .map(TimestampedValue::getValue) |
| .iterator()) |
| + ":cleanup"; |
| context.output(output); |
| bagState.clear(); |
| } |
| }; |
| |
| PCollection<String> output = pipeline.apply(transform).apply(ParDo.of(fn)); |
| List<String> expected = |
| IntStream.rangeClosed(0, numTestElements) |
| .mapToObj(expandFn(numTestElements)) |
| .collect(Collectors.toList()); |
| PAssert.that(output).containsInAnyOrder(expected); |
| pipeline.run(); |
| } |
| |
| private IntFunction<String> expandFn(int numTestElements) { |
| return i -> |
| Joiner.on(":") |
| .join( |
| IntStream.rangeClosed(0, Math.min(numTestElements - 1, i)) |
| .mapToObj(String::valueOf) |
| .iterator()) |
| + (i == numTestElements ? ":cleanup" : ""); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testPipelineOptionsParameterOnTimer() { |
| final String timerId = "thisTimer"; |
| |
| PCollection<String> results = |
| pipeline |
| .apply(Create.of(KV.of(0, 0))) |
| .apply( |
| ParDo.of( |
| new DoFn<KV<Integer, Integer>, String>() { |
| @TimerId(timerId) |
| private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void process( |
| ProcessContext c, BoundedWindow w, @TimerId(timerId) Timer timer) { |
| timer.set(w.maxTimestamp()); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer(OutputReceiver<String> r, PipelineOptions options) { |
| r.output(options.as(MyOptions.class).getFakeOption()); |
| } |
| })); |
| |
| String testOptionValue = "not fake anymore"; |
| pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue); |
| PAssert.that(results).containsInAnyOrder("not fake anymore"); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesTestStream.class}) |
| public void duplicateTimerSetting() { |
| TestStream<KV<String, String>> stream = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) |
| .addElements(KV.of("key1", "v1")) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<String> result = pipeline.apply(stream).apply(ParDo.of(new TwoTimerDoFn())); |
| PAssert.that(result).containsInAnyOrder("It works"); |
| |
| pipeline.run().waitUntilFinish(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesTimersInParDo.class, |
| UsesTestStream.class, |
| UsesStrictTimerOrdering.class |
| }) |
| public void testTwoTimersSettingEachOther() { |
| Instant now = new Instant(1500000000000L); |
| Instant end = now.plus(100); |
| TestStream<KV<Void, Void>> input = |
| TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of())) |
| .addElements(KV.of(null, null)) |
| .advanceWatermarkToInfinity(); |
| pipeline.apply(TwoTimerTest.of(now, end, input)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class}) |
| public void testTwoTimersSettingEachOtherWithCreateAsInput() { |
| Instant now = new Instant(1500000000000L); |
| Instant end = now.plus(100); |
| pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)))); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesStatefulParDo.class, |
| UsesTimersInParDo.class, |
| UsesTestStream.class, |
| UsesTestStreamWithOutputTimestamp.class |
| }) |
| public void testOutputTimestamp() { |
| final String timerId = "bar"; |
| DoFn<KV<String, Integer>, KV<String, Integer>> fn1 = |
| new DoFn<KV<String, Integer>, KV<String, Integer>>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement( |
| @TimerId(timerId) Timer timer, OutputReceiver<KV<String, Integer>> o) { |
| timer.withOutputTimestamp(new Instant(5)).set(new Instant(10)); |
| // Output a message. This will cause the next DoFn to set a timer as well. |
| o.output(KV.of("foo", 100)); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer(OnTimerContext c, BoundedWindow w) {} |
| }; |
| |
| DoFn<KV<String, Integer>, Integer> fn2 = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @TimerId(timerId) |
| private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @StateId("timerFired") |
| final StateSpec<ValueState<Boolean>> timerFiredState = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| @TimerId(timerId) Timer timer, |
| @StateId("timerFired") ValueState<Boolean> timerFiredState) { |
| Boolean timerFired = timerFiredState.read(); |
| assertTrue(timerFired == null || !timerFired); |
| // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the |
| // previous |
| // DoFn timer's watermark hold. This timer should not fire until the previous timer |
| // fires and removes |
| // the watermark hold. |
| timer.set(new Instant(8)); |
| } |
| |
| @OnTimer(timerId) |
| public void onTimer( |
| @StateId("timerFired") ValueState<Boolean> timerFiredState, |
| OutputReceiver<Integer> o) { |
| timerFiredState.write(true); |
| o.output(100); |
| } |
| }; |
| |
| TestStream<KV<String, Integer>> stream = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) |
| .advanceWatermarkTo(new Instant(0)) |
| // Cause fn2 to set a timer. |
| .addElements(KV.of("key", 1)) |
| // Normally this would case fn2's timer to expire, but it shouldn't here because of |
| // the output timestamp. |
| .advanceWatermarkTo(new Instant(9)) |
| // If the timer fired, then this would case fn2 to fail with an assertion error. |
| .addElements(KV.of("key", 1)) |
| .advanceWatermarkToInfinity(); |
| PCollection<Integer> output = |
| pipeline.apply(stream).apply("first", ParDo.of(fn1)).apply("second", ParDo.of(fn2)); |
| |
| PAssert.that(output).containsInAnyOrder(100); // result output |
| pipeline.run(); |
| } |
| |
| private static class TwoTimerTest extends PTransform<PBegin, PDone> { |
| |
| private static PTransform<PBegin, PDone> of( |
| Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) { |
| return new TwoTimerTest(start, end, input); |
| } |
| |
| private final Instant start; |
| private final Instant end; |
| private final transient PTransform<PBegin, PCollection<KV<Void, Void>>> inputPTransform; |
| |
| public TwoTimerTest( |
| Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) { |
| this.start = start; |
| this.end = end; |
| this.inputPTransform = input; |
| } |
| |
| @Override |
| public PDone expand(PBegin input) { |
| |
| final String timerName1 = "t1"; |
| final String timerName2 = "t2"; |
| final String countStateName = "count"; |
| PCollection<String> result = |
| input |
| .apply(inputPTransform) |
| .apply( |
| ParDo.of( |
| new DoFn<KV<Void, Void>, String>() { |
| |
| @TimerId(timerName1) |
| final TimerSpec timerSpec1 = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @TimerId(timerName2) |
| final TimerSpec timerSpec2 = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @StateId(countStateName) |
| final StateSpec<ValueState<Integer>> countStateSpec = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext context, |
| @TimerId(timerName1) Timer t1, |
| @TimerId(timerName2) Timer t2, |
| @StateId(countStateName) ValueState<Integer> state) { |
| |
| state.write(0); |
| t1.set(start); |
| // set the t2 timer after end, so that we test that |
| // timers are correctly ordered in this case |
| t2.set(end.plus(1)); |
| } |
| |
| @OnTimer(timerName1) |
| public void onTimer1( |
| OnTimerContext context, |
| @TimerId(timerName2) Timer t2, |
| @StateId(countStateName) ValueState<Integer> state) { |
| |
| Integer current = state.read(); |
| t2.set(context.timestamp()); |
| |
| context.output( |
| "t1:" |
| + current |
| + ":" |
| + context.timestamp().minus(start.getMillis()).getMillis()); |
| } |
| |
| @OnTimer(timerName2) |
| public void onTimer2( |
| OnTimerContext context, |
| @TimerId(timerName1) Timer t1, |
| @StateId(countStateName) ValueState<Integer> state) { |
| Integer current = state.read(); |
| if (context.timestamp().isBefore(end)) { |
| state.write(current + 1); |
| t1.set(context.timestamp().plus(1)); |
| } else { |
| state.write(-1); |
| } |
| context.output( |
| "t2:" |
| + current |
| + ":" |
| + context.timestamp().minus(start.getMillis()).getMillis()); |
| } |
| })); |
| |
| List<String> expected = |
| LongStream.rangeClosed(0, 100) |
| .mapToObj(e -> (Long) e) |
| .flatMap(e -> Arrays.asList("t1:" + e + ":" + e, "t2:" + e + ":" + e).stream()) |
| .collect(Collectors.toList()); |
| PAssert.that(result).containsInAnyOrder(expected); |
| |
| return PDone.in(input.getPipeline()); |
| } |
| } |
| } |
| |
| /** Tests validating Timer coder inference behaviors. */ |
| @RunWith(JUnit4.class) |
| public static class TimerCoderInferenceTests extends SharedTestBase implements Serializable { |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesStatefulParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testValueStateCoderInference() { |
| final String stateId = "foo"; |
| MyIntegerCoder myIntegerCoder = MyIntegerCoder.of(); |
| pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder); |
| |
| DoFn<KV<String, Integer>, MyInteger> fn = |
| new DoFn<KV<String, Integer>, MyInteger>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<MyInteger>> intState = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| ProcessContext c, |
| @StateId(stateId) ValueState<MyInteger> state, |
| OutputReceiver<MyInteger> r) { |
| MyInteger currentValue = MoreObjects.firstNonNull(state.read(), new MyInteger(0)); |
| r.output(currentValue); |
| state.write(new MyInteger(currentValue.getValue() + 1)); |
| } |
| }; |
| |
| PCollection<MyInteger> output = |
| pipeline |
| .apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) |
| .apply(ParDo.of(fn)) |
| .setCoder(myIntegerCoder); |
| |
| PAssert.that(output).containsInAnyOrder(new MyInteger(0), new MyInteger(1), new MyInteger(2)); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesStatefulParDo.class}) |
| public void testValueStateCoderInferenceFailure() throws Exception { |
| final String stateId = "foo"; |
| MyIntegerCoder myIntegerCoder = MyIntegerCoder.of(); |
| |
| DoFn<KV<String, Integer>, MyInteger> fn = |
| new DoFn<KV<String, Integer>, MyInteger>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<MyInteger>> intState = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| @StateId(stateId) ValueState<MyInteger> state, OutputReceiver<MyInteger> r) { |
| MyInteger currentValue = MoreObjects.firstNonNull(state.read(), new MyInteger(0)); |
| r.output(currentValue); |
| state.write(new MyInteger(currentValue.getValue() + 1)); |
| } |
| }; |
| |
| thrown.expect(RuntimeException.class); |
| thrown.expectMessage("Unable to infer a coder for ValueState and no Coder was specified."); |
| |
| pipeline |
| .apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) |
| .apply(ParDo.of(fn)) |
| .setCoder(myIntegerCoder); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| ValidatesRunner.class, |
| UsesStatefulParDo.class, |
| DataflowPortabilityApiUnsupported.class |
| }) |
| public void testValueStateCoderInferenceFromInputCoder() { |
| final String stateId = "foo"; |
| MyIntegerCoder myIntegerCoder = MyIntegerCoder.of(); |
| |
| DoFn<KV<String, MyInteger>, MyInteger> fn = |
| new DoFn<KV<String, MyInteger>, MyInteger>() { |
| |
| @StateId(stateId) |
| private final StateSpec<ValueState<MyInteger>> intState = StateSpecs.value(); |
| |
| @ProcessElement |
| public void processElement( |
| @StateId(stateId) ValueState<MyInteger> state, OutputReceiver<MyInteger> r) { |
| MyInteger currentValue = MoreObjects.firstNonNull(state.read(), new MyInteger(0)); |
| r.output(currentValue); |
| state.write(new MyInteger(currentValue.getValue() + 1)); |
| } |
| }; |
| |
| pipeline |
| .apply( |
| Create.of( |
| KV.of("hello", new MyInteger(42)), |
| KV.of("hello", new MyInteger(97)), |
| KV.of("hello", new MyInteger(84))) |
| .withCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder))) |
| .apply(ParDo.of(fn)) |
| .setCoder(myIntegerCoder); |
| |
| pipeline.run(); |
| } |
| } |
| |
| private static class FnWithSideInputs extends DoFn<String, String> { |
| private final PCollectionView<Integer> view; |
| |
| private FnWithSideInputs(PCollectionView<Integer> view) { |
| this.view = view; |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext c, @Element String element) { |
| c.output(element + ":" + c.sideInput(view)); |
| } |
| } |
| |
| private static class TestDummy {} |
| |
| private static class TestDummyCoder extends AtomicCoder<TestDummy> { |
| private TestDummyCoder() {} |
| |
| private static final TestDummyCoder INSTANCE = new TestDummyCoder(); |
| |
| @JsonCreator |
| public static TestDummyCoder of() { |
| return INSTANCE; |
| } |
| |
| @Override |
| public void encode(TestDummy value, OutputStream outStream) throws IOException {} |
| |
| @Override |
| public TestDummy decode(InputStream inStream) throws IOException { |
| return new TestDummy(); |
| } |
| |
| @Override |
| public boolean isRegisterByteSizeObserverCheap(TestDummy value) { |
| return true; |
| } |
| |
| @Override |
| public void registerByteSizeObserver(TestDummy value, ElementByteSizeObserver observer) |
| throws Exception { |
| observer.update(0L); |
| } |
| |
| @Override |
| public void verifyDeterministic() {} |
| } |
| |
| private static class TaggedOutputDummyFn extends DoFn<Integer, Integer> { |
| private TupleTag<Integer> mainOutputTag; |
| private TupleTag<TestDummy> dummyOutputTag; |
| |
| public TaggedOutputDummyFn( |
| TupleTag<Integer> mainOutputTag, TupleTag<TestDummy> dummyOutputTag) { |
| this.mainOutputTag = mainOutputTag; |
| this.dummyOutputTag = dummyOutputTag; |
| } |
| |
| @ProcessElement |
| public void processElement(MultiOutputReceiver r) { |
| r.get(mainOutputTag).output(1); |
| r.get(dummyOutputTag).output(new TestDummy()); |
| } |
| } |
| |
| private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> { |
| private TupleTag<TestDummy> mainOutputTag; |
| private TupleTag<Integer> intOutputTag; |
| |
| public MainOutputDummyFn(TupleTag<TestDummy> mainOutputTag, TupleTag<Integer> intOutputTag) { |
| this.mainOutputTag = mainOutputTag; |
| this.intOutputTag = intOutputTag; |
| } |
| |
| @ProcessElement |
| public void processElement(MultiOutputReceiver r) { |
| r.get(mainOutputTag).output(new TestDummy()); |
| r.get(intOutputTag).output(1); |
| } |
| } |
| |
| private static class MyInteger implements Comparable<MyInteger> { |
| private final int value; |
| |
| MyInteger(int value) { |
| this.value = value; |
| } |
| |
| public int getValue() { |
| return value; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| |
| if (!(o instanceof MyInteger)) { |
| return false; |
| } |
| |
| MyInteger myInteger = (MyInteger) o; |
| |
| return value == myInteger.value; |
| } |
| |
| @Override |
| public int hashCode() { |
| return value; |
| } |
| |
| @Override |
| public int compareTo(MyInteger o) { |
| return Integer.compare(this.getValue(), o.getValue()); |
| } |
| |
| @Override |
| public String toString() { |
| return "MyInteger{" + "value=" + value + '}'; |
| } |
| } |
| |
| private static class MyIntegerCoder extends AtomicCoder<MyInteger> { |
| private static final MyIntegerCoder INSTANCE = new MyIntegerCoder(); |
| |
| private final VarIntCoder delegate = VarIntCoder.of(); |
| |
| public static MyIntegerCoder of() { |
| return INSTANCE; |
| } |
| |
| @Override |
| public void encode(MyInteger value, OutputStream outStream) throws IOException { |
| delegate.encode(value.getValue(), outStream); |
| } |
| |
| @Override |
| public MyInteger decode(InputStream inStream) throws IOException { |
| return new MyInteger(delegate.decode(inStream)); |
| } |
| } |
| |
| /** PAssert "matcher" for expected output. */ |
| static class HasExpectedOutput |
| implements SerializableFunction<Iterable<String>, Void>, Serializable { |
| private final List<Integer> inputs; |
| private final List<Integer> sideInputs; |
| private final String additionalOutput; |
| |
| public static HasExpectedOutput forInput(List<Integer> inputs) { |
| return new HasExpectedOutput(new ArrayList<>(inputs), new ArrayList<>(), ""); |
| } |
| |
| private HasExpectedOutput( |
| List<Integer> inputs, List<Integer> sideInputs, String additionalOutput) { |
| this.inputs = inputs; |
| this.sideInputs = sideInputs; |
| this.additionalOutput = additionalOutput; |
| } |
| |
| public HasExpectedOutput andSideInputs(Integer... sideInputValues) { |
| return new HasExpectedOutput(inputs, Arrays.asList(sideInputValues), additionalOutput); |
| } |
| |
| public HasExpectedOutput fromOutput(TupleTag<String> outputTag) { |
| return fromOutput(outputTag.getId()); |
| } |
| |
| public HasExpectedOutput fromOutput(String outputId) { |
| return new HasExpectedOutput(inputs, sideInputs, outputId); |
| } |
| |
| @Override |
| public Void apply(Iterable<String> outputs) { |
| List<String> processeds = new ArrayList<>(); |
| List<String> finisheds = new ArrayList<>(); |
| for (String output : outputs) { |
| if (output.contains("finished")) { |
| finisheds.add(output); |
| } else { |
| processeds.add(output); |
| } |
| } |
| |
| String sideInputsSuffix; |
| if (sideInputs.isEmpty()) { |
| sideInputsSuffix = ""; |
| } else { |
| sideInputsSuffix = ": " + sideInputs; |
| } |
| |
| String additionalOutputPrefix; |
| if (additionalOutput.isEmpty()) { |
| additionalOutputPrefix = ""; |
| } else { |
| additionalOutputPrefix = additionalOutput + ": "; |
| } |
| |
| List<String> expectedProcesseds = new ArrayList<>(); |
| for (Integer input : inputs) { |
| expectedProcesseds.add(additionalOutputPrefix + "processing: " + input + sideInputsSuffix); |
| } |
| String[] expectedProcessedsArray = |
| expectedProcesseds.toArray(new String[expectedProcesseds.size()]); |
| assertThat(processeds, containsInAnyOrder(expectedProcessedsArray)); |
| |
| for (String finished : finisheds) { |
| assertEquals(additionalOutputPrefix + "finished", finished); |
| } |
| |
| return null; |
| } |
| } |
| |
| private static class Checker implements SerializableFunction<Iterable<String>, Void> { |
| @Override |
| public Void apply(Iterable<String> input) { |
| boolean foundElement = false; |
| boolean foundFinish = false; |
| for (String str : input) { |
| if ("elem:1:1".equals(str)) { |
| if (foundElement) { |
| throw new AssertionError("Received duplicate element"); |
| } |
| foundElement = true; |
| } else if ("finish:3:3".equals(str)) { |
| foundFinish = true; |
| } else { |
| throw new AssertionError("Got unexpected value: " + str); |
| } |
| } |
| if (!foundElement) { |
| throw new AssertionError("Missing \"elem:1:1\""); |
| } |
| if (!foundFinish) { |
| throw new AssertionError("Missing \"finish:3:3\""); |
| } |
| |
| return null; |
| } |
| } |
| |
| /** A {@link PipelineOptions} subclass for testing passing to a {@link DoFn}. */ |
| public interface MyOptions extends PipelineOptions { |
| @Default.String("fake option") |
| String getFakeOption(); |
| |
| void setFakeOption(String value); |
| } |
| |
| private static class TwoTimerDoFn extends DoFn<KV<String, String>, String> { |
| @TimerId("timer") |
| private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void process(ProcessContext c, @TimerId("timer") Timer timer) { |
| timer.offset(Duration.standardMinutes(10)).setRelative(); |
| timer.offset(Duration.standardMinutes(30)).setRelative(); |
| } |
| |
| @OnTimer("timer") |
| public void onTimer(OutputReceiver<String> r, @TimerId("timer") Timer timer) { |
| r.output("It works"); |
| } |
| } |
| |
| /** Tests to validate ParDo timerFamily. */ |
| @RunWith(JUnit4.class) |
| public static class TimerFamilyTests extends SharedTestBase implements Serializable { |
| |
| @Test |
| @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTimerMap.class}) |
| public void testTimerFamilyEventTime() throws Exception { |
| final String timerFamilyId = "foo"; |
| |
| DoFn<KV<String, Integer>, String> fn = |
| new DoFn<KV<String, Integer>, String>() { |
| |
| @TimerFamily(timerFamilyId) |
| private final TimerSpec spec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement( |
| @TimerFamily(timerFamilyId) TimerMap timers, OutputReceiver<String> r) { |
| timers.set("timer1", new Instant(1)); |
| timers.set("timer2", new Instant(2)); |
| r.output("process"); |
| } |
| |
| @OnTimerFamily(timerFamilyId) |
| public void onTimer( |
| @TimerId String timerId, |
| @Timestamp Instant ts, |
| @TimerFamily(timerFamilyId) TimerMap timerMap, |
| OutputReceiver<String> r) { |
| System.out.println("timer Id : " + timerId); |
| System.out.println("timerMap : " + timerMap.toString()); |
| r.output(timerId); |
| } |
| }; |
| |
| PCollection<String> output = |
| pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); |
| PAssert.that(output).containsInAnyOrder("process", "timer1", "timer2"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTimerMap.class}) |
| public void testTimerWithMultipleTimerFamily() throws Exception { |
| final String timerFamilyId1 = "foo"; |
| final String timerFamilyId2 = "bar"; |
| |
| DoFn<KV<String, Integer>, String> fn = |
| new DoFn<KV<String, Integer>, String>() { |
| |
| @TimerFamily(timerFamilyId1) |
| private final TimerSpec spec1 = TimerSpecs.timerMap(TimeDomain.EVENT_TIME); |
| |
| @TimerFamily(timerFamilyId2) |
| private final TimerSpec spec2 = TimerSpecs.timerMap(TimeDomain.EVENT_TIME); |
| |
| @ProcessElement |
| public void processElement( |
| @TimerFamily(timerFamilyId1) TimerMap timerMap1, |
| @TimerFamily(timerFamilyId2) TimerMap timerMap2, |
| OutputReceiver<String> r) { |
| timerMap1.set("timer", new Instant(1)); |
| timerMap2.set("timer", new Instant(2)); |
| r.output("process"); |
| } |
| |
| @OnTimerFamily(timerFamilyId1) |
| public void onTimer1( |
| @TimerId String timerId, @Timestamp Instant ts, OutputReceiver<String> r) { |
| r.output(timerId); |
| } |
| |
| @OnTimerFamily(timerFamilyId2) |
| public void onTimer2( |
| @TimerId String timerId, @Timestamp Instant ts, OutputReceiver<String> r) { |
| r.output(timerId); |
| } |
| }; |
| |
| PCollection<String> output = |
| pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); |
| PAssert.that(output).containsInAnyOrder("process", "timer", "timer"); |
| pipeline.run(); |
| } |
| |
| @Test |
| @Category({ |
| NeedsRunner.class, |
| UsesTimersInParDo.class, |
| UsesTestStreamWithProcessingTime.class, |
| UsesTimerMap.class |
| }) |
| public void testTimerFamilyProcessingTime() throws Exception { |
| final String timerId = "foo"; |
| |
| DoFn<KV<String, Integer>, Integer> fn = |
| new DoFn<KV<String, Integer>, Integer>() { |
| |
| @TimerFamily(timerId) |
| private final TimerSpec spec = TimerSpecs.timerMap(TimeDomain.PROCESSING_TIME); |
| |
| @ProcessElement |
| public void processElement( |
| @TimerFamily(timerId) TimerMap timerMap, OutputReceiver<Integer> r) { |
| Timer timer = timerMap.get("timerId1"); |
| timer.offset(Duration.standardSeconds(1)).setRelative(); |
| r.output(3); |
| } |
| |
| @OnTimerFamily(timerId) |
| public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { |
| if (timeDomain.equals(TimeDomain.PROCESSING_TIME)) { |
| r.output(42); |
| } |
| } |
| }; |
| |
| TestStream<KV<String, Integer>> stream = |
| TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) |
| .addElements(KV.of("hello", 37)) |
| .advanceProcessingTime(Duration.standardSeconds(2)) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); |
| PAssert.that(output).containsInAnyOrder(3, 42); |
| pipeline.run(); |
| } |
| } |
| } |