/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.transforms;

import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesBoundedSplittableParDo;
import org.apache.beam.sdk.testing.UsesParDoLifecycle;
import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Ordering;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
 * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior.
 */
@RunWith(JUnit4.class)
public class SplittableDoFnTest implements Serializable {

  static class PairStringWithIndexToLengthBase extends DoFn<String, KV<String, Integer>> {
    @ProcessElement
    public ProcessContinuation process(
        ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
      for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
          tracker.tryClaim(i);
          ++i, ++numIterations) {
        c.output(KV.of(c.element(), (int) i));
        if (numIterations % 3 == 0) {
          return resume();
        }
      }
      return stop();
    }

    @GetInitialRestriction
    public OffsetRange getInitialRange(String element) {
      return new OffsetRange(0, element.length());
    }

    @SplitRestriction
    public void splitRange(
        String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
      receiver.output(new OffsetRange(range.getFrom(), (range.getFrom() + range.getTo()) / 2));
      receiver.output(new OffsetRange((range.getFrom() + range.getTo()) / 2, range.getTo()));
    }
  }

  @BoundedPerElement
  static class PairStringWithIndexToLengthBounded extends PairStringWithIndexToLengthBase {}

  @UnboundedPerElement
  static class PairStringWithIndexToLengthUnbounded extends PairStringWithIndexToLengthBase {}

  private static PairStringWithIndexToLengthBase pairStringWithIndexToLengthFn(IsBounded bounded) {
    return (bounded == IsBounded.BOUNDED)
        ? new PairStringWithIndexToLengthBounded()
        : new PairStringWithIndexToLengthUnbounded();
  }

  @Rule public final transient TestPipeline p = TestPipeline.create();

  @Test
  @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
  public void testPairWithIndexBasicBounded() {
    testPairWithIndexBasic(IsBounded.BOUNDED);
  }

  @Test
  @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
  public void testPairWithIndexBasicUnbounded() {
    testPairWithIndexBasic(IsBounded.UNBOUNDED);
  }

  private void testPairWithIndexBasic(IsBounded bounded) {
    PCollection<KV<String, Integer>> res =
        p.apply(Create.of("a", "bb", "ccccc"))
            .apply(ParDo.of(pairStringWithIndexToLengthFn(bounded)))
            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));

    PAssert.that(res)
        .containsInAnyOrder(
            Arrays.asList(
                KV.of("a", 0),
                KV.of("bb", 0),
                KV.of("bb", 1),
                KV.of("ccccc", 0),
                KV.of("ccccc", 1),
                KV.of("ccccc", 2),
                KV.of("ccccc", 3),
                KV.of("ccccc", 4)));

    p.run();
  }

  @Test
  @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
  public void testPairWithIndexWindowedTimestampedBounded() {
    testPairWithIndexWindowedTimestamped(IsBounded.BOUNDED);
  }

  @Test
  @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
  public void testPairWithIndexWindowedTimestampedUnbounded() {
    testPairWithIndexWindowedTimestamped(IsBounded.UNBOUNDED);
  }

  private void testPairWithIndexWindowedTimestamped(IsBounded bounded) {
    // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
    // of elements in the input collection.

    MutableDateTime mutableNow = Instant.now().toMutableDateTime();
    mutableNow.setMillisOfSecond(0);
    Instant now = mutableNow.toInstant();
    Instant nowP1 = now.plus(Duration.standardSeconds(1));
    Instant nowP2 = now.plus(Duration.standardSeconds(2));

    SlidingWindows windowFn =
        SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
    PCollection<KV<String, Integer>> res =
        p.apply(
                Create.timestamped(
                    TimestampedValue.of("a", now),
                    TimestampedValue.of("bb", nowP1),
                    TimestampedValue.of("ccccc", nowP2)))
            .apply(Window.into(windowFn))
            .apply(ParDo.of(pairStringWithIndexToLengthFn(bounded)))
            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));

    assertEquals(windowFn, res.getWindowingStrategy().getWindowFn());

    PCollection<TimestampedValue<KV<String, Integer>>> timestamped = res.apply(Reify.timestamps());

    for (int i = 0; i < 4; ++i) {
      Instant base = now.minus(Duration.standardSeconds(i));
      IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5)));

      List<TimestampedValue<KV<String, Integer>>> expectedUnfiltered =
          Arrays.asList(
              TimestampedValue.of(KV.of("a", 0), now),
              TimestampedValue.of(KV.of("bb", 0), nowP1),
              TimestampedValue.of(KV.of("bb", 1), nowP1),
              TimestampedValue.of(KV.of("ccccc", 0), nowP2),
              TimestampedValue.of(KV.of("ccccc", 1), nowP2),
              TimestampedValue.of(KV.of("ccccc", 2), nowP2),
              TimestampedValue.of(KV.of("ccccc", 3), nowP2),
              TimestampedValue.of(KV.of("ccccc", 4), nowP2));

      List<TimestampedValue<KV<String, Integer>>> expected = new ArrayList<>();
      for (TimestampedValue<KV<String, Integer>> tv : expectedUnfiltered) {
        if (!window.start().isAfter(tv.getTimestamp())
            && !tv.getTimestamp().isAfter(window.maxTimestamp())) {
          expected.add(tv);
        }
      }
      assertFalse(expected.isEmpty());

      PAssert.that(timestamped).inWindow(window).containsInAnyOrder(expected);
    }
    p.run();
  }

  private static class SDFWithMultipleOutputsPerBlockBase extends DoFn<String, Integer> {
    private static final int MAX_INDEX = 98765;

    private final int numClaimsPerCall;

    private SDFWithMultipleOutputsPerBlockBase(int numClaimsPerCall) {
      this.numClaimsPerCall = numClaimsPerCall;
    }

    private static int snapToNextBlock(int index, int[] blockStarts) {
      for (int i = 1; i < blockStarts.length; ++i) {
        if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
          return i;
        }
      }
      throw new IllegalStateException("Shouldn't get here");
    }

    @ProcessElement
    public ProcessContinuation processElement(
        ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
      int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
      int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
      for (int i = trueStart, numIterations = 1;
          tracker.tryClaim((long) blockStarts[i]);
          ++i, ++numIterations) {
        for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
          c.output(index);
        }
        if (numIterations == numClaimsPerCall) {
          return resume();
        }
      }
      return stop();
    }

    @GetInitialRestriction
    public OffsetRange getInitialRange(String element) {
      return new OffsetRange(0, MAX_INDEX);
    }
  }

  @BoundedPerElement
  private static class SDFWithMultipleOutputsPerBlockBounded
      extends SDFWithMultipleOutputsPerBlockBase {
    SDFWithMultipleOutputsPerBlockBounded(int numClaimsPerCall) {
      super(numClaimsPerCall);
    }
  }

  @UnboundedPerElement
  private static class SDFWithMultipleOutputsPerBlockUnbounded
      extends SDFWithMultipleOutputsPerBlockBase {
    SDFWithMultipleOutputsPerBlockUnbounded(int numClaimsPerCall) {
      super(numClaimsPerCall);
    }
  }

  private static SDFWithMultipleOutputsPerBlockBase sdfWithMultipleOutputsPerBlock(
      IsBounded bounded, int numClaimsPerCall) {
    return (bounded == IsBounded.BOUNDED)
        ? new SDFWithMultipleOutputsPerBlockBounded(numClaimsPerCall)
        : new SDFWithMultipleOutputsPerBlockUnbounded(numClaimsPerCall);
  }

  @Test
  @Category({
    ValidatesRunner.class,
    UsesBoundedSplittableParDo.class,
    DataflowPortabilityApiUnsupported.class
  })
  public void testOutputAfterCheckpointBounded() {
    testOutputAfterCheckpoint(IsBounded.BOUNDED);
  }

  @Test
  @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
  public void testOutputAfterCheckpointUnbounded() {
    testOutputAfterCheckpoint(IsBounded.UNBOUNDED);
  }

  private void testOutputAfterCheckpoint(IsBounded bounded) {
    PCollection<Integer> outputs =
        p.apply(Create.of("foo"))
            .apply(ParDo.of(sdfWithMultipleOutputsPerBlock(bounded, 3)))
            .apply(Window.<Integer>configure().triggering(Never.ever()).discardingFiredPanes());
    PAssert.thatSingleton(outputs.apply(Count.globally()))
        .isEqualTo((long) SDFWithMultipleOutputsPerBlockBase.MAX_INDEX);
    p.run();
  }

  private static class SDFWithSideInputBase extends DoFn<Integer, String> {
    private final PCollectionView<String> sideInput;

    private SDFWithSideInputBase(PCollectionView<String> sideInput) {
      this.sideInput = sideInput;
    }

    @ProcessElement
    public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
      checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
      String side = c.sideInput(sideInput);
      c.output(side + ":" + c.element());
    }

    @GetInitialRestriction
    public OffsetRange getInitialRestriction(Integer value) {
      return new OffsetRange(0, 1);
    }
  }

  @BoundedPerElement
  private static class SDFWithSideInputBounded extends SDFWithSideInputBase {
    private SDFWithSideInputBounded(PCollectionView<String> sideInput) {
      super(sideInput);
    }
  }

  @UnboundedPerElement
  private static class SDFWithSideInputUnbounded extends SDFWithSideInputBase {
    private SDFWithSideInputUnbounded(PCollectionView<String> sideInput) {
      super(sideInput);
    }
  }

  private static SDFWithSideInputBase sdfWithSideInput(
      IsBounded bounded, PCollectionView<String> sideInput) {
    return (bounded == IsBounded.BOUNDED)
        ? new SDFWithSideInputBounded(sideInput)
        : new SDFWithSideInputUnbounded(sideInput);
  }

  @Test
  @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesSideInputs.class})
  public void testSideInputBounded() {
    testSideInput(IsBounded.BOUNDED);
  }

  @Test
  @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
  public void testSideInputUnbounded() {
    testSideInput(IsBounded.UNBOUNDED);
  }

  private void testSideInput(IsBounded bounded) {
    PCollectionView<String> sideInput =
        p.apply("side input", Create.of("foo")).apply(View.asSingleton());

    PCollection<String> res =
        p.apply("input", Create.of(0, 1, 2))
            .apply(ParDo.of(sdfWithSideInput(bounded, sideInput)).withSideInputs(sideInput));

    PAssert.that(res).containsInAnyOrder(Arrays.asList("foo:0", "foo:1", "foo:2"));

    p.run();
  }

  @Test
  @Category({
    ValidatesRunner.class,
    UsesBoundedSplittableParDo.class,
    UsesSplittableParDoWithWindowedSideInputs.class
  })
  public void testWindowedSideInputBounded() {
    testWindowedSideInput(IsBounded.BOUNDED);
  }

  @Test
  @Category({
    ValidatesRunner.class,
    UsesUnboundedSplittableParDo.class,
    UsesSplittableParDoWithWindowedSideInputs.class,
  })
  public void testWindowedSideInputUnbounded() {
    testWindowedSideInput(IsBounded.UNBOUNDED);
  }

  private void testWindowedSideInput(IsBounded bounded) {
    PCollection<Integer> mainInput =
        p.apply(
                "main",
                Create.timestamped(
                    TimestampedValue.of(0, new Instant(0)),
                    TimestampedValue.of(1, new Instant(1)),
                    TimestampedValue.of(2, new Instant(2)),
                    TimestampedValue.of(3, new Instant(3)),
                    TimestampedValue.of(4, new Instant(4)),
                    TimestampedValue.of(5, new Instant(5)),
                    TimestampedValue.of(6, new Instant(6)),
                    TimestampedValue.of(7, new Instant(7))))
            .apply("window 2", Window.into(FixedWindows.of(Duration.millis(2))));

    PCollectionView<String> sideInput =
        p.apply(
                "side",
                Create.timestamped(
                    TimestampedValue.of("a", new Instant(0)),
                    TimestampedValue.of("b", new Instant(4))))
            .apply("window 4", Window.into(FixedWindows.of(Duration.millis(4))))
            .apply("singleton", View.asSingleton());

    PCollection<String> res =
        mainInput.apply(ParDo.of(sdfWithSideInput(bounded, sideInput)).withSideInputs(sideInput));

    PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");

    p.run();
  }

  private static class SDFWithMultipleOutputsPerBlockAndSideInputBase
      extends DoFn<Integer, KV<String, Integer>> {
    private static final int MAX_INDEX = 98765;
    private final PCollectionView<String> sideInput;
    private final int numClaimsPerCall;

    SDFWithMultipleOutputsPerBlockAndSideInputBase(
        PCollectionView<String> sideInput, int numClaimsPerCall) {
      this.sideInput = sideInput;
      this.numClaimsPerCall = numClaimsPerCall;
    }

    private static int snapToNextBlock(int index, int[] blockStarts) {
      for (int i = 1; i < blockStarts.length; ++i) {
        if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
          return i;
        }
      }
      throw new IllegalStateException("Shouldn't get here");
    }

    @ProcessElement
    public ProcessContinuation processElement(
        ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
      int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
      int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
      for (int i = trueStart, numIterations = 1;
          tracker.tryClaim((long) blockStarts[i]);
          ++i, ++numIterations) {
        for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
          c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index));
        }
        if (numIterations == numClaimsPerCall) {
          return resume();
        }
      }
      return stop();
    }

    @GetInitialRestriction
    public OffsetRange getInitialRange(Integer element) {
      return new OffsetRange(0, MAX_INDEX);
    }
  }

  @BoundedPerElement
  private static class SDFWithMultipleOutputsPerBlockAndSideInputBounded
      extends SDFWithMultipleOutputsPerBlockAndSideInputBase {
    private SDFWithMultipleOutputsPerBlockAndSideInputBounded(
        PCollectionView<String> sideInput, int numClaimsPerCall) {
      super(sideInput, numClaimsPerCall);
    }
  }

  @UnboundedPerElement
  private static class SDFWithMultipleOutputsPerBlockAndSideInputUnbounded
      extends SDFWithMultipleOutputsPerBlockAndSideInputBase {
    private SDFWithMultipleOutputsPerBlockAndSideInputUnbounded(
        PCollectionView<String> sideInput, int numClaimsPerCall) {
      super(sideInput, numClaimsPerCall);
    }
  }

  private static SDFWithMultipleOutputsPerBlockAndSideInputBase
      sdfWithMultipleOutputsPerBlockAndSideInput(
          IsBounded bounded, PCollectionView<String> sideInput, int numClaimsPerCall) {
    return (bounded == IsBounded.BOUNDED)
        ? new SDFWithMultipleOutputsPerBlockAndSideInputBounded(sideInput, numClaimsPerCall)
        : new SDFWithMultipleOutputsPerBlockAndSideInputUnbounded(sideInput, numClaimsPerCall);
  }

  @Test
  @Category({
    ValidatesRunner.class,
    UsesBoundedSplittableParDo.class,
    UsesSplittableParDoWithWindowedSideInputs.class
  })
  public void testWindowedSideInputWithCheckpointsBounded() {
    testWindowedSideInputWithCheckpoints(IsBounded.BOUNDED);
  }

  @Test
  @Category({
    ValidatesRunner.class,
    UsesUnboundedSplittableParDo.class,
    UsesSplittableParDoWithWindowedSideInputs.class,
  })
  public void testWindowedSideInputWithCheckpointsUnbounded() {
    testWindowedSideInputWithCheckpoints(IsBounded.UNBOUNDED);
  }

  private void testWindowedSideInputWithCheckpoints(IsBounded bounded) {
    PCollection<Integer> mainInput =
        p.apply(
                "main",
                Create.timestamped(
                    TimestampedValue.of(0, new Instant(0)),
                    TimestampedValue.of(1, new Instant(1)),
                    TimestampedValue.of(2, new Instant(2)),
                    TimestampedValue.of(3, new Instant(3))))
            .apply("window 1", Window.into(FixedWindows.of(Duration.millis(1))));

    PCollectionView<String> sideInput =
        p.apply(
                "side",
                Create.timestamped(
                    TimestampedValue.of("a", new Instant(0)),
                    TimestampedValue.of("b", new Instant(2))))
            .apply("window 2", Window.into(FixedWindows.of(Duration.millis(2))))
            .apply("singleton", View.asSingleton());

    PCollection<KV<String, Integer>> res =
        mainInput.apply(
            ParDo.of(
                    sdfWithMultipleOutputsPerBlockAndSideInput(
                        bounded, sideInput, 3 /* numClaimsPerCall */))
                .withSideInputs(sideInput));
    PCollection<KV<String, Iterable<Integer>>> grouped = res.apply(GroupByKey.create());

    PAssert.that(grouped.apply(Keys.create())).containsInAnyOrder("a:0", "a:1", "b:2", "b:3");
    PAssert.that(grouped)
        .satisfies(
            input -> {
              List<Integer> expected = new ArrayList<>();
              for (int i = 0; i < SDFWithMultipleOutputsPerBlockAndSideInputBase.MAX_INDEX; ++i) {
                expected.add(i);
              }
              for (KV<String, Iterable<Integer>> kv : input) {
                assertEquals(expected, Ordering.<Integer>natural().sortedCopy(kv.getValue()));
              }
              return null;
            });
    p.run();

    // TODO: also test coverage when some of the windows of the side input are not ready.
  }

  private static class SDFWithAdditionalOutputBase extends DoFn<Integer, String> {
    private final TupleTag<String> additionalOutput;

    private SDFWithAdditionalOutputBase(TupleTag<String> additionalOutput) {
      this.additionalOutput = additionalOutput;
    }

    @ProcessElement
    public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
      checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
      c.output("main:" + c.element());
      c.output(additionalOutput, "additional:" + c.element());
    }

    @GetInitialRestriction
    public OffsetRange getInitialRestriction(Integer value) {
      return new OffsetRange(0, 1);
    }
  }

  @BoundedPerElement
  private static class SDFWithAdditionalOutputBounded extends SDFWithAdditionalOutputBase {
    private SDFWithAdditionalOutputBounded(TupleTag<String> additionalOutput) {
      super(additionalOutput);
    }
  }

  @UnboundedPerElement
  private static class SDFWithAdditionalOutputUnbounded extends SDFWithAdditionalOutputBase {
    private SDFWithAdditionalOutputUnbounded(TupleTag<String> additionalOutput) {
      super(additionalOutput);
    }
  }

  private static SDFWithAdditionalOutputBase sdfWithAdditionalOutput(
      IsBounded bounded, TupleTag<String> additionalOutput) {
    return (bounded == IsBounded.BOUNDED)
        ? new SDFWithAdditionalOutputBounded(additionalOutput)
        : new SDFWithAdditionalOutputUnbounded(additionalOutput);
  }

  @Test
  @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
  public void testAdditionalOutputBounded() {
    testAdditionalOutput(IsBounded.BOUNDED);
  }

  @Test
  @Category({ValidatesRunner.class, UsesUnboundedSplittableParDo.class})
  public void testAdditionalOutputUnbounded() {
    testAdditionalOutput(IsBounded.UNBOUNDED);
  }

  private void testAdditionalOutput(IsBounded bounded) {
    TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
    TupleTag<String> additionalOutputTag = new TupleTag<String>("additional") {};

    PCollectionTuple res =
        p.apply("input", Create.of(0, 1, 2))
            .apply(
                ParDo.of(sdfWithAdditionalOutput(bounded, additionalOutputTag))
                    .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));

    PAssert.that(res.get(mainOutputTag))
        .containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2"));
    PAssert.that(res.get(additionalOutputTag))
        .containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2"));

    p.run();
  }

  @Test(timeout = 15000L)
  @Ignore("https://issues.apache.org/jira/browse/BEAM-6354")
  @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesTestStream.class})
  public void testLateData() {

    Instant base = Instant.now();

    TestStream<String> stream =
        TestStream.create(StringUtf8Coder.of())
            .advanceWatermarkTo(base)
            .addElements("aa")
            .advanceWatermarkTo(base.plus(Duration.standardSeconds(5)))
            .addElements(TimestampedValue.of("bb", base.minus(Duration.standardHours(1))))
            .advanceProcessingTime(Duration.standardHours(1))
            .advanceWatermarkToInfinity();

    PCollection<String> input =
        p.apply(stream)
            .apply(
                Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
                    .withAllowedLateness(Duration.standardMinutes(1))
                    .discardingFiredPanes());

    PCollection<KV<String, Integer>> afterSDF =
        input
            .apply(ParDo.of(pairStringWithIndexToLengthFn(IsBounded.UNBOUNDED)))
            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));

    PCollection<String> nonLate = afterSDF.apply(GroupByKey.create()).apply(Keys.create());

    // The splittable DoFn itself should not drop any data and act as pass-through.
    PAssert.that(afterSDF)
        .containsInAnyOrder(
            Arrays.asList(KV.of("aa", 0), KV.of("aa", 1), KV.of("bb", 0), KV.of("bb", 1)));

    // But it should preserve the windowing strategy of the data, including allowed lateness:
    // the follow-up GBK should drop the late data.
    assertEquals(afterSDF.getWindowingStrategy(), input.getWindowingStrategy());
    PAssert.that(nonLate).containsInAnyOrder("aa");

    p.run();
  }

  private static class SDFWithLifecycleBase extends DoFn<String, String> {
    private enum State {
      OUTSIDE_BUNDLE,
      INSIDE_BUNDLE,
      TORN_DOWN
    }

    private transient State state;

    @GetInitialRestriction
    public OffsetRange getInitialRestriction(String value) {
      assertEquals(State.OUTSIDE_BUNDLE, state);
      return new OffsetRange(0, 1);
    }

    @SplitRestriction
    public void splitRestriction(
        String value, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
      assertEquals(State.OUTSIDE_BUNDLE, state);
      receiver.output(range);
    }

    @Setup
    public void setUp() {
      assertEquals(null, state);
      state = State.OUTSIDE_BUNDLE;
    }

    @StartBundle
    public void startBundle() {
      assertEquals(State.OUTSIDE_BUNDLE, state);
      state = State.INSIDE_BUNDLE;
    }

    @ProcessElement
    public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
      assertEquals(State.INSIDE_BUNDLE, state);
      assertTrue(tracker.tryClaim(0L));
      c.output(c.element());
    }

    @FinishBundle
    public void finishBundle() {
      assertEquals(State.INSIDE_BUNDLE, state);
      state = State.OUTSIDE_BUNDLE;
    }

    @Teardown
    public void tearDown() {
      assertEquals(State.OUTSIDE_BUNDLE, state);
      state = State.TORN_DOWN;
    }
  }

  @BoundedPerElement
  private static class SDFWithLifecycleBounded extends SDFWithLifecycleBase {}

  @UnboundedPerElement
  private static class SDFWithLifecycleUnbounded extends SDFWithLifecycleBase {}

  private static SDFWithLifecycleBase sdfWithLifecycle(IsBounded bounded) {
    return (bounded == IsBounded.BOUNDED)
        ? new SDFWithLifecycleBounded()
        : new SDFWithLifecycleUnbounded();
  }

  @Test
  @Category({ValidatesRunner.class, UsesParDoLifecycle.class, UsesBoundedSplittableParDo.class})
  public void testLifecycleMethodsBounded() {
    testLifecycleMethods(IsBounded.BOUNDED);
  }

  @Test
  @Category({ValidatesRunner.class, UsesParDoLifecycle.class, UsesUnboundedSplittableParDo.class})
  public void testLifecycleMethodsUnbounded() {
    testLifecycleMethods(IsBounded.UNBOUNDED);
  }

  private void testLifecycleMethods(IsBounded bounded) {
    PCollection<String> res =
        p.apply(Create.of("a", "b", "c")).apply(ParDo.of(sdfWithLifecycle(bounded)));
    PAssert.that(res).containsInAnyOrder("a", "b", "c");
    p.run();
  }

  @Test
  @Category(NeedsRunner.class)
  public void testBoundedness() {
    // use TestPipeline.create() because we assert without p.run();
    Pipeline p = TestPipeline.create();
    PCollection<String> foo = p.apply(Create.of("foo"));
    {
      PCollection<String> res =
          foo.apply(
              ParDo.of(
                  new DoFn<String, String>() {
                    @ProcessElement
                    public void process(
                        @Element String element, RestrictionTracker<OffsetRange, Long> tracker) {
                      // Doesn't matter
                    }

                    @GetInitialRestriction
                    public OffsetRange getInitialRestriction(String element) {
                      return new OffsetRange(0, 1);
                    }
                  }));
      assertEquals(PCollection.IsBounded.BOUNDED, res.isBounded());
    }
    {
      PCollection<String> res =
          foo.apply(
              ParDo.of(
                  new DoFn<String, String>() {
                    @ProcessElement
                    public ProcessContinuation process(
                        @Element String element, RestrictionTracker<OffsetRange, Long> tracker) {
                      return stop();
                    }

                    @GetInitialRestriction
                    public OffsetRange getInitialRestriction(String element) {
                      return new OffsetRange(0, 1);
                    }
                  }));
      assertEquals(PCollection.IsBounded.UNBOUNDED, res.isBounded());
    }
  }

  // TODO (https://issues.apache.org/jira/browse/BEAM-988): Test that Splittable DoFn
  // emits output immediately (i.e. has a pass-through trigger) regardless of input's
  // windowing/triggering strategy.
}
