blob: eeb5d6da8e701280ececbc8a610f109544b77e10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesBoundedSplittableParDo;
import org.apache.beam.sdk.testing.UsesParDoLifecycle;
import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior.
*/
@RunWith(JUnit4.class)
public class SplittableDoFnTest implements Serializable {
static class PairStringWithIndexToLengthBase extends DoFn<String, KV<String, Integer>> {
@ProcessElement
public ProcessContinuation process(
ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
tracker.tryClaim(i);
++i, ++numIterations) {
c.output(KV.of(c.element(), (int) i));
if (numIterations % 3 == 0) {
return resume();
}
}
return stop();
}
@GetInitialRestriction
public OffsetRange getInitialRange(String element) {
return new OffsetRange(0, element.length());
}
@SplitRestriction
public void splitRange(
String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
receiver.output(new OffsetRange(range.getFrom(), (range.getFrom() + range.getTo()) / 2));
receiver.output(new OffsetRange((range.getFrom() + range.getTo()) / 2, range.getTo()));
}
}
@BoundedPerElement
static class PairStringWithIndexToLengthBounded extends PairStringWithIndexToLengthBase {}
@UnboundedPerElement
static class PairStringWithIndexToLengthUnbounded extends PairStringWithIndexToLengthBase {}
private static PairStringWithIndexToLengthBase pairStringWithIndexToLengthFn(IsBounded bounded) {
return (bounded == IsBounded.BOUNDED)
? new PairStringWithIndexToLengthBounded()
: new PairStringWithIndexToLengthUnbounded();
}
@Rule public final transient TestPipeline p = TestPipeline.create();
@Test
@Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
public void testPairWithIndexBasicBounded() {
testPairWithIndexBasic(IsBounded.BOUNDED);
}
@Test
@Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
public void testPairWithIndexBasicUnbounded() {
testPairWithIndexBasic(IsBounded.UNBOUNDED);
}
private void testPairWithIndexBasic(IsBounded bounded) {
PCollection<KV<String, Integer>> res =
p.apply(Create.of("a", "bb", "ccccc"))
.apply(ParDo.of(pairStringWithIndexToLengthFn(bounded)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
PAssert.that(res)
.containsInAnyOrder(
Arrays.asList(
KV.of("a", 0),
KV.of("bb", 0),
KV.of("bb", 1),
KV.of("ccccc", 0),
KV.of("ccccc", 1),
KV.of("ccccc", 2),
KV.of("ccccc", 3),
KV.of("ccccc", 4)));
p.run();
}
@Test
@Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
public void testPairWithIndexWindowedTimestampedBounded() {
testPairWithIndexWindowedTimestamped(IsBounded.BOUNDED);
}
@Test
@Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
public void testPairWithIndexWindowedTimestampedUnbounded() {
testPairWithIndexWindowedTimestamped(IsBounded.UNBOUNDED);
}
private void testPairWithIndexWindowedTimestamped(IsBounded bounded) {
// Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
// of elements in the input collection.
MutableDateTime mutableNow = Instant.now().toMutableDateTime();
mutableNow.setMillisOfSecond(0);
Instant now = mutableNow.toInstant();
Instant nowP1 = now.plus(Duration.standardSeconds(1));
Instant nowP2 = now.plus(Duration.standardSeconds(2));
SlidingWindows windowFn =
SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
PCollection<KV<String, Integer>> res =
p.apply(
Create.timestamped(
TimestampedValue.of("a", now),
TimestampedValue.of("bb", nowP1),
TimestampedValue.of("ccccc", nowP2)))
.apply(Window.into(windowFn))
.apply(ParDo.of(pairStringWithIndexToLengthFn(bounded)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
assertEquals(windowFn, res.getWindowingStrategy().getWindowFn());
PCollection<TimestampedValue<KV<String, Integer>>> timestamped = res.apply(Reify.timestamps());
for (int i = 0; i < 4; ++i) {
Instant base = now.minus(Duration.standardSeconds(i));
IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5)));
List<TimestampedValue<KV<String, Integer>>> expectedUnfiltered =
Arrays.asList(
TimestampedValue.of(KV.of("a", 0), now),
TimestampedValue.of(KV.of("bb", 0), nowP1),
TimestampedValue.of(KV.of("bb", 1), nowP1),
TimestampedValue.of(KV.of("ccccc", 0), nowP2),
TimestampedValue.of(KV.of("ccccc", 1), nowP2),
TimestampedValue.of(KV.of("ccccc", 2), nowP2),
TimestampedValue.of(KV.of("ccccc", 3), nowP2),
TimestampedValue.of(KV.of("ccccc", 4), nowP2));
List<TimestampedValue<KV<String, Integer>>> expected = new ArrayList<>();
for (TimestampedValue<KV<String, Integer>> tv : expectedUnfiltered) {
if (!window.start().isAfter(tv.getTimestamp())
&& !tv.getTimestamp().isAfter(window.maxTimestamp())) {
expected.add(tv);
}
}
assertFalse(expected.isEmpty());
PAssert.that(timestamped).inWindow(window).containsInAnyOrder(expected);
}
p.run();
}
private static class SDFWithMultipleOutputsPerBlockBase extends DoFn<String, Integer> {
private static final int MAX_INDEX = 98765;
private final int numClaimsPerCall;
private SDFWithMultipleOutputsPerBlockBase(int numClaimsPerCall) {
this.numClaimsPerCall = numClaimsPerCall;
}
private static int snapToNextBlock(int index, int[] blockStarts) {
for (int i = 1; i < blockStarts.length; ++i) {
if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
return i;
}
}
throw new IllegalStateException("Shouldn't get here");
}
@ProcessElement
public ProcessContinuation processElement(
ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
for (int i = trueStart, numIterations = 1;
tracker.tryClaim((long) blockStarts[i]);
++i, ++numIterations) {
for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
c.output(index);
}
if (numIterations == numClaimsPerCall) {
return resume();
}
}
return stop();
}
@GetInitialRestriction
public OffsetRange getInitialRange(String element) {
return new OffsetRange(0, MAX_INDEX);
}
}
@BoundedPerElement
private static class SDFWithMultipleOutputsPerBlockBounded
extends SDFWithMultipleOutputsPerBlockBase {
SDFWithMultipleOutputsPerBlockBounded(int numClaimsPerCall) {
super(numClaimsPerCall);
}
}
@UnboundedPerElement
private static class SDFWithMultipleOutputsPerBlockUnbounded
extends SDFWithMultipleOutputsPerBlockBase {
SDFWithMultipleOutputsPerBlockUnbounded(int numClaimsPerCall) {
super(numClaimsPerCall);
}
}
private static SDFWithMultipleOutputsPerBlockBase sdfWithMultipleOutputsPerBlock(
IsBounded bounded, int numClaimsPerCall) {
return (bounded == IsBounded.BOUNDED)
? new SDFWithMultipleOutputsPerBlockBounded(numClaimsPerCall)
: new SDFWithMultipleOutputsPerBlockUnbounded(numClaimsPerCall);
}
@Test
@Category({
ValidatesRunner.class,
UsesBoundedSplittableParDo.class,
DataflowPortabilityApiUnsupported.class
})
public void testOutputAfterCheckpointBounded() {
testOutputAfterCheckpoint(IsBounded.BOUNDED);
}
@Test
@Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
public void testOutputAfterCheckpointUnbounded() {
testOutputAfterCheckpoint(IsBounded.UNBOUNDED);
}
private void testOutputAfterCheckpoint(IsBounded bounded) {
PCollection<Integer> outputs =
p.apply(Create.of("foo"))
.apply(ParDo.of(sdfWithMultipleOutputsPerBlock(bounded, 3)))
.apply(Window.<Integer>configure().triggering(Never.ever()).discardingFiredPanes());
PAssert.thatSingleton(outputs.apply(Count.globally()))
.isEqualTo((long) SDFWithMultipleOutputsPerBlockBase.MAX_INDEX);
p.run();
}
private static class SDFWithSideInputBase extends DoFn<Integer, String> {
private final PCollectionView<String> sideInput;
private SDFWithSideInputBase(PCollectionView<String> sideInput) {
this.sideInput = sideInput;
}
@ProcessElement
public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
String side = c.sideInput(sideInput);
c.output(side + ":" + c.element());
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(Integer value) {
return new OffsetRange(0, 1);
}
}
@BoundedPerElement
private static class SDFWithSideInputBounded extends SDFWithSideInputBase {
private SDFWithSideInputBounded(PCollectionView<String> sideInput) {
super(sideInput);
}
}
@UnboundedPerElement
private static class SDFWithSideInputUnbounded extends SDFWithSideInputBase {
private SDFWithSideInputUnbounded(PCollectionView<String> sideInput) {
super(sideInput);
}
}
private static SDFWithSideInputBase sdfWithSideInput(
IsBounded bounded, PCollectionView<String> sideInput) {
return (bounded == IsBounded.BOUNDED)
? new SDFWithSideInputBounded(sideInput)
: new SDFWithSideInputUnbounded(sideInput);
}
@Test
@Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesSideInputs.class})
public void testSideInputBounded() {
testSideInput(IsBounded.BOUNDED);
}
@Test
@Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
public void testSideInputUnbounded() {
testSideInput(IsBounded.UNBOUNDED);
}
private void testSideInput(IsBounded bounded) {
PCollectionView<String> sideInput =
p.apply("side input", Create.of("foo")).apply(View.asSingleton());
PCollection<String> res =
p.apply("input", Create.of(0, 1, 2))
.apply(ParDo.of(sdfWithSideInput(bounded, sideInput)).withSideInputs(sideInput));
PAssert.that(res).containsInAnyOrder(Arrays.asList("foo:0", "foo:1", "foo:2"));
p.run();
}
@Test
@Category({
ValidatesRunner.class,
UsesBoundedSplittableParDo.class,
UsesSplittableParDoWithWindowedSideInputs.class
})
public void testWindowedSideInputBounded() {
testWindowedSideInput(IsBounded.BOUNDED);
}
@Test
@Category({
ValidatesRunner.class,
UsesUnboundedSplittableParDo.class,
UsesSplittableParDoWithWindowedSideInputs.class,
})
public void testWindowedSideInputUnbounded() {
testWindowedSideInput(IsBounded.UNBOUNDED);
}
private void testWindowedSideInput(IsBounded bounded) {
PCollection<Integer> mainInput =
p.apply(
"main",
Create.timestamped(
TimestampedValue.of(0, new Instant(0)),
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(2, new Instant(2)),
TimestampedValue.of(3, new Instant(3)),
TimestampedValue.of(4, new Instant(4)),
TimestampedValue.of(5, new Instant(5)),
TimestampedValue.of(6, new Instant(6)),
TimestampedValue.of(7, new Instant(7))))
.apply("window 2", Window.into(FixedWindows.of(Duration.millis(2))));
PCollectionView<String> sideInput =
p.apply(
"side",
Create.timestamped(
TimestampedValue.of("a", new Instant(0)),
TimestampedValue.of("b", new Instant(4))))
.apply("window 4", Window.into(FixedWindows.of(Duration.millis(4))))
.apply("singleton", View.asSingleton());
PCollection<String> res =
mainInput.apply(ParDo.of(sdfWithSideInput(bounded, sideInput)).withSideInputs(sideInput));
PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");
p.run();
}
private static class SDFWithMultipleOutputsPerBlockAndSideInputBase
extends DoFn<Integer, KV<String, Integer>> {
private static final int MAX_INDEX = 98765;
private final PCollectionView<String> sideInput;
private final int numClaimsPerCall;
SDFWithMultipleOutputsPerBlockAndSideInputBase(
PCollectionView<String> sideInput, int numClaimsPerCall) {
this.sideInput = sideInput;
this.numClaimsPerCall = numClaimsPerCall;
}
private static int snapToNextBlock(int index, int[] blockStarts) {
for (int i = 1; i < blockStarts.length; ++i) {
if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
return i;
}
}
throw new IllegalStateException("Shouldn't get here");
}
@ProcessElement
public ProcessContinuation processElement(
ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
for (int i = trueStart, numIterations = 1;
tracker.tryClaim((long) blockStarts[i]);
++i, ++numIterations) {
for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index));
}
if (numIterations == numClaimsPerCall) {
return resume();
}
}
return stop();
}
@GetInitialRestriction
public OffsetRange getInitialRange(Integer element) {
return new OffsetRange(0, MAX_INDEX);
}
}
@BoundedPerElement
private static class SDFWithMultipleOutputsPerBlockAndSideInputBounded
extends SDFWithMultipleOutputsPerBlockAndSideInputBase {
private SDFWithMultipleOutputsPerBlockAndSideInputBounded(
PCollectionView<String> sideInput, int numClaimsPerCall) {
super(sideInput, numClaimsPerCall);
}
}
@UnboundedPerElement
private static class SDFWithMultipleOutputsPerBlockAndSideInputUnbounded
extends SDFWithMultipleOutputsPerBlockAndSideInputBase {
private SDFWithMultipleOutputsPerBlockAndSideInputUnbounded(
PCollectionView<String> sideInput, int numClaimsPerCall) {
super(sideInput, numClaimsPerCall);
}
}
private static SDFWithMultipleOutputsPerBlockAndSideInputBase
sdfWithMultipleOutputsPerBlockAndSideInput(
IsBounded bounded, PCollectionView<String> sideInput, int numClaimsPerCall) {
return (bounded == IsBounded.BOUNDED)
? new SDFWithMultipleOutputsPerBlockAndSideInputBounded(sideInput, numClaimsPerCall)
: new SDFWithMultipleOutputsPerBlockAndSideInputUnbounded(sideInput, numClaimsPerCall);
}
@Test
@Category({
ValidatesRunner.class,
UsesBoundedSplittableParDo.class,
UsesSplittableParDoWithWindowedSideInputs.class
})
public void testWindowedSideInputWithCheckpointsBounded() {
testWindowedSideInputWithCheckpoints(IsBounded.BOUNDED);
}
@Test
@Category({
ValidatesRunner.class,
UsesUnboundedSplittableParDo.class,
UsesSplittableParDoWithWindowedSideInputs.class,
})
public void testWindowedSideInputWithCheckpointsUnbounded() {
testWindowedSideInputWithCheckpoints(IsBounded.UNBOUNDED);
}
private void testWindowedSideInputWithCheckpoints(IsBounded bounded) {
PCollection<Integer> mainInput =
p.apply(
"main",
Create.timestamped(
TimestampedValue.of(0, new Instant(0)),
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(2, new Instant(2)),
TimestampedValue.of(3, new Instant(3))))
.apply("window 1", Window.into(FixedWindows.of(Duration.millis(1))));
PCollectionView<String> sideInput =
p.apply(
"side",
Create.timestamped(
TimestampedValue.of("a", new Instant(0)),
TimestampedValue.of("b", new Instant(2))))
.apply("window 2", Window.into(FixedWindows.of(Duration.millis(2))))
.apply("singleton", View.asSingleton());
PCollection<KV<String, Integer>> res =
mainInput.apply(
ParDo.of(
sdfWithMultipleOutputsPerBlockAndSideInput(
bounded, sideInput, 3 /* numClaimsPerCall */))
.withSideInputs(sideInput));
PCollection<KV<String, Iterable<Integer>>> grouped = res.apply(GroupByKey.create());
PAssert.that(grouped.apply(Keys.create())).containsInAnyOrder("a:0", "a:1", "b:2", "b:3");
PAssert.that(grouped)
.satisfies(
input -> {
List<Integer> expected = new ArrayList<>();
for (int i = 0; i < SDFWithMultipleOutputsPerBlockAndSideInputBase.MAX_INDEX; ++i) {
expected.add(i);
}
for (KV<String, Iterable<Integer>> kv : input) {
assertEquals(expected, Ordering.<Integer>natural().sortedCopy(kv.getValue()));
}
return null;
});
p.run();
// TODO: also test coverage when some of the windows of the side input are not ready.
}
private static class SDFWithAdditionalOutputBase extends DoFn<Integer, String> {
private final TupleTag<String> additionalOutput;
private SDFWithAdditionalOutputBase(TupleTag<String> additionalOutput) {
this.additionalOutput = additionalOutput;
}
@ProcessElement
public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
c.output("main:" + c.element());
c.output(additionalOutput, "additional:" + c.element());
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(Integer value) {
return new OffsetRange(0, 1);
}
}
@BoundedPerElement
private static class SDFWithAdditionalOutputBounded extends SDFWithAdditionalOutputBase {
private SDFWithAdditionalOutputBounded(TupleTag<String> additionalOutput) {
super(additionalOutput);
}
}
@UnboundedPerElement
private static class SDFWithAdditionalOutputUnbounded extends SDFWithAdditionalOutputBase {
private SDFWithAdditionalOutputUnbounded(TupleTag<String> additionalOutput) {
super(additionalOutput);
}
}
private static SDFWithAdditionalOutputBase sdfWithAdditionalOutput(
IsBounded bounded, TupleTag<String> additionalOutput) {
return (bounded == IsBounded.BOUNDED)
? new SDFWithAdditionalOutputBounded(additionalOutput)
: new SDFWithAdditionalOutputUnbounded(additionalOutput);
}
@Test
@Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
public void testAdditionalOutputBounded() {
testAdditionalOutput(IsBounded.BOUNDED);
}
@Test
@Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
public void testAdditionalOutputUnbounded() {
testAdditionalOutput(IsBounded.UNBOUNDED);
}
private void testAdditionalOutput(IsBounded bounded) {
TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
TupleTag<String> additionalOutputTag = new TupleTag<String>("additional") {};
PCollectionTuple res =
p.apply("input", Create.of(0, 1, 2))
.apply(
ParDo.of(sdfWithAdditionalOutput(bounded, additionalOutputTag))
.withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
PAssert.that(res.get(mainOutputTag))
.containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2"));
PAssert.that(res.get(additionalOutputTag))
.containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2"));
p.run();
}
@Test(timeout = 15000L)
@Ignore("https://issues.apache.org/jira/browse/BEAM-6354")
@Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesTestStream.class})
public void testLateData() {
Instant base = Instant.now();
TestStream<String> stream =
TestStream.create(StringUtf8Coder.of())
.advanceWatermarkTo(base)
.addElements("aa")
.advanceWatermarkTo(base.plus(Duration.standardSeconds(5)))
.addElements(TimestampedValue.of("bb", base.minus(Duration.standardHours(1))))
.advanceProcessingTime(Duration.standardHours(1))
.advanceWatermarkToInfinity();
PCollection<String> input =
p.apply(stream)
.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
PCollection<KV<String, Integer>> afterSDF =
input
.apply(ParDo.of(pairStringWithIndexToLengthFn(IsBounded.UNBOUNDED)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
PCollection<String> nonLate = afterSDF.apply(GroupByKey.create()).apply(Keys.create());
// The splittable DoFn itself should not drop any data and act as pass-through.
PAssert.that(afterSDF)
.containsInAnyOrder(
Arrays.asList(KV.of("aa", 0), KV.of("aa", 1), KV.of("bb", 0), KV.of("bb", 1)));
// But it should preserve the windowing strategy of the data, including allowed lateness:
// the follow-up GBK should drop the late data.
assertEquals(afterSDF.getWindowingStrategy(), input.getWindowingStrategy());
PAssert.that(nonLate).containsInAnyOrder("aa");
p.run();
}
private static class SDFWithLifecycleBase extends DoFn<String, String> {
private enum State {
OUTSIDE_BUNDLE,
INSIDE_BUNDLE,
TORN_DOWN
}
private transient State state;
@GetInitialRestriction
public OffsetRange getInitialRestriction(String value) {
assertEquals(State.OUTSIDE_BUNDLE, state);
return new OffsetRange(0, 1);
}
@SplitRestriction
public void splitRestriction(
String value, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
assertEquals(State.OUTSIDE_BUNDLE, state);
receiver.output(range);
}
@Setup
public void setUp() {
assertEquals(null, state);
state = State.OUTSIDE_BUNDLE;
}
@StartBundle
public void startBundle() {
assertEquals(State.OUTSIDE_BUNDLE, state);
state = State.INSIDE_BUNDLE;
}
@ProcessElement
public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
assertEquals(State.INSIDE_BUNDLE, state);
assertTrue(tracker.tryClaim(0L));
c.output(c.element());
}
@FinishBundle
public void finishBundle() {
assertEquals(State.INSIDE_BUNDLE, state);
state = State.OUTSIDE_BUNDLE;
}
@Teardown
public void tearDown() {
assertEquals(State.OUTSIDE_BUNDLE, state);
state = State.TORN_DOWN;
}
}
@BoundedPerElement
private static class SDFWithLifecycleBounded extends SDFWithLifecycleBase {}
@UnboundedPerElement
private static class SDFWithLifecycleUnbounded extends SDFWithLifecycleBase {}
private static SDFWithLifecycleBase sdfWithLifecycle(IsBounded bounded) {
return (bounded == IsBounded.BOUNDED)
? new SDFWithLifecycleBounded()
: new SDFWithLifecycleUnbounded();
}
@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class, UsesBoundedSplittableParDo.class})
public void testLifecycleMethodsBounded() {
testLifecycleMethods(IsBounded.BOUNDED);
}
@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class, UsesUnboundedSplittableParDo.class})
public void testLifecycleMethodsUnbounded() {
testLifecycleMethods(IsBounded.UNBOUNDED);
}
private void testLifecycleMethods(IsBounded bounded) {
PCollection<String> res =
p.apply(Create.of("a", "b", "c")).apply(ParDo.of(sdfWithLifecycle(bounded)));
PAssert.that(res).containsInAnyOrder("a", "b", "c");
p.run();
}
@Test
@Category(NeedsRunner.class)
public void testBoundedness() {
// use TestPipeline.create() because we assert without p.run();
Pipeline p = TestPipeline.create();
PCollection<String> foo = p.apply(Create.of("foo"));
{
PCollection<String> res =
foo.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void process(
@Element String element, RestrictionTracker<OffsetRange, Long> tracker) {
// Doesn't matter
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(String element) {
return new OffsetRange(0, 1);
}
}));
assertEquals(PCollection.IsBounded.BOUNDED, res.isBounded());
}
{
PCollection<String> res =
foo.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public ProcessContinuation process(
@Element String element, RestrictionTracker<OffsetRange, Long> tracker) {
return stop();
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(String element) {
return new OffsetRange(0, 1);
}
}));
assertEquals(PCollection.IsBounded.UNBOUNDED, res.isBounded());
}
}
// TODO (https://issues.apache.org/jira/browse/BEAM-988): Test that Splittable DoFn
// emits output immediately (i.e. has a pass-through trigger) regardless of input's
// windowing/triggering strategy.
}