/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.direct;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
import static org.apache.beam.runners.direct.DirectGraphs.getProducer;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ContiguousSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.DiscreteDomain;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Range;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link UnboundedReadEvaluatorFactory}. */
@RunWith(JUnit4.class)
public class UnboundedReadEvaluatorFactoryTest {
  private PCollection<Long> longs;
  private UnboundedReadEvaluatorFactory factory;
  private EvaluationContext context;
  private UncommittedBundle<Long> output;

  private BundleFactory bundleFactory = ImmutableListBundleFactory.create();

  private UnboundedSource<Long, ?> source;
  private DirectGraph graph;

  @Rule public ExpectedException thrown = ExpectedException.none();
  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
  private PipelineOptions options;

  @Before
  public void setup() {
    source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
    longs = p.apply(Read.from(source));
    options = PipelineOptionsFactory.create();

    context = mock(EvaluationContext.class);
    factory = new UnboundedReadEvaluatorFactory(context, options);
    output = bundleFactory.createBundle(longs);
    graph = DirectGraphs.getGraph(p);
    when(context.createBundle(longs)).thenReturn(output);
  }

  @Test
  public void generatesInitialSplits() throws Exception {
    when(context.createRootBundle()).thenAnswer(invocation -> bundleFactory.createRootBundle());

    int numSplits = 5;
    Collection<CommittedBundle<?>> initialInputs =
        new UnboundedReadEvaluatorFactory.InputProvider(context, options)
            .getInitialInputs(graph.getProducer(longs), numSplits);
    // CountingSource.unbounded has very good splitting behavior
    assertThat(initialInputs, hasSize(numSplits));

    int readPerSplit = 100;
    int totalSize = numSplits * readPerSplit;
    Set<Long> expectedOutputs =
        ContiguousSet.create(Range.closedOpen(0L, (long) totalSize), DiscreteDomain.longs());

    Collection<Long> readItems = new ArrayList<>(totalSize);
    for (CommittedBundle<?> initialInput : initialInputs) {
      CommittedBundle<UnboundedSourceShard<Long, ?>> shardBundle =
          (CommittedBundle<UnboundedSourceShard<Long, ?>>) initialInput;
      WindowedValue<UnboundedSourceShard<Long, ?>> shard =
          Iterables.getOnlyElement(shardBundle.getElements());
      assertThat(shard.getTimestamp(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
      assertThat(shard.getWindows(), Matchers.contains(GlobalWindow.INSTANCE));
      UnboundedSource<Long, ?> shardSource = shard.getValue().getSource();
      readItems.addAll(
          SourceTestUtils.readNItemsFromUnstartedReader(
              shardSource.createReader(
                  PipelineOptionsFactory.create(), null /* No starting checkpoint */),
              readPerSplit));
    }
    assertThat(readItems, containsInAnyOrder(expectedOutputs.toArray(new Long[0])));
  }

  @Test
  public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());

    Collection<CommittedBundle<?>> initialInputs =
        new UnboundedReadEvaluatorFactory.InputProvider(context, options)
            .getInitialInputs(graph.getProducer(longs), 1);

    CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
    UnboundedSourceShard<Long, ?> inputShard =
        (UnboundedSourceShard<Long, ?>)
            Iterables.getOnlyElement(inputShards.getElements()).getValue();
    TransformEvaluator<? super UnboundedSourceShard<Long, ?>> evaluator =
        factory.forApplication(graph.getProducer(longs), inputShards);

    evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements()));
    TransformResult<? super UnboundedSourceShard<Long, ?>> result = evaluator.finishBundle();

    WindowedValue<? super UnboundedSourceShard<Long, ?>> residual =
        Iterables.getOnlyElement(result.getUnprocessedElements());
    assertThat(residual.getTimestamp(), Matchers.lessThan(DateTime.now().toInstant()));
    UnboundedSourceShard<Long, ?> residualShard =
        (UnboundedSourceShard<Long, ?>) residual.getValue();
    assertThat(residualShard.getSource(), equalTo(inputShard.getSource()));
    assertThat(residualShard.getCheckpoint(), not(nullValue()));
    assertThat(
        output.commit(Instant.now()).getElements(),
        containsInAnyOrder(
            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
            tgw(0L)));
  }

  @Test
  public void unboundedSourceWithDuplicatesMultipleCalls() throws Exception {
    Long[] outputs = new Long[20];
    for (long i = 0L; i < 20L; i++) {
      outputs[(int) i] = i % 5L;
    }
    TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of(), outputs);
    source.dedupes = true;

    PCollection<Long> pcollection = p.apply(Read.from(source));
    AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection);

    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
    Collection<CommittedBundle<?>> initialInputs =
        new UnboundedReadEvaluatorFactory.InputProvider(context, options)
            .getInitialInputs(sourceTransform, 1);

    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
    when(context.createBundle(pcollection)).thenReturn(output);
    CommittedBundle<?> inputBundle = Iterables.getOnlyElement(initialInputs);
    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
        factory.forApplication(sourceTransform, inputBundle);

    for (WindowedValue<?> value : inputBundle.getElements()) {
      evaluator.processElement(
          (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value);
    }
    TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
        evaluator.finishBundle();
    assertThat(
        output.commit(Instant.now()).getElements(),
        containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L)));

    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
    when(context.createBundle(longs)).thenReturn(secondOutput);
    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> secondEvaluator =
        factory.forApplication(sourceTransform, inputBundle);
    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
        (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
            Iterables.getOnlyElement(result.getUnprocessedElements());
    secondEvaluator.processElement(residual);
    secondEvaluator.finishBundle();
    assertThat(secondOutput.commit(Instant.now()).getElements(), Matchers.emptyIterable());
  }

  @Test
  public void noElementsAvailableReaderIncludedInResidual() throws Exception {
    // Read with a very slow rate so by the second read there are no more elements
    PCollection<Long> pcollection =
        p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L)));
    AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection);

    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
    Collection<CommittedBundle<?>> initialInputs =
        new UnboundedReadEvaluatorFactory.InputProvider(context, options)
            .getInitialInputs(sourceTransform, 1);

    // Process the initial shard. This might produce some output, and will produce a residual shard
    // which should produce no output when read from within the following day.
    when(context.createBundle(pcollection)).thenReturn(bundleFactory.createBundle(pcollection));
    CommittedBundle<?> inputBundle = Iterables.getOnlyElement(initialInputs);
    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
        factory.forApplication(sourceTransform, inputBundle);
    for (WindowedValue<?> value : inputBundle.getElements()) {
      evaluator.processElement(
          (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value);
    }
    TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
        evaluator.finishBundle();

    // Read from the residual of the first read. This should not produce any output, but should
    // include a residual shard in the result.
    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
    when(context.createBundle(longs)).thenReturn(secondOutput);
    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> secondEvaluator =
        factory.forApplication(sourceTransform, inputBundle);
    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
        (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
            Iterables.getOnlyElement(result.getUnprocessedElements());
    secondEvaluator.processElement(residual);

    TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> secondResult =
        secondEvaluator.finishBundle();

    // Sanity check that nothing was output (The test would have to run for more than a day to do
    // so correctly.)
    assertThat(secondOutput.commit(Instant.now()).getElements(), Matchers.emptyIterable());

    // Test that even though the reader produced no outputs, there is still a residual shard with
    // the updated watermark.
    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed =
        (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
            Iterables.getOnlyElement(secondResult.getUnprocessedElements());
    assertThat(unprocessed.getTimestamp(), Matchers.greaterThan(residual.getTimestamp()));
    assertThat(unprocessed.getValue().getExistingReader(), not(nullValue()));
  }

  @Test
  public void evaluatorReusesReaderAndClosesAtTheEnd() throws Exception {
    int numElements = 1000;
    ContiguousSet<Long> elems =
        ContiguousSet.create(Range.openClosed(0L, (long) numElements), DiscreteDomain.longs());
    TestUnboundedSource<Long> source =
        new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));
    source.advanceWatermarkToInfinity = true;

    PCollection<Long> pcollection = p.apply(Read.from(source));
    DirectGraph graph = DirectGraphs.getGraph(p);
    AppliedPTransform<?, ?, ?> sourceTransform = graph.getProducer(pcollection);

    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
    UncommittedBundle<Long> output = mock(UncommittedBundle.class);
    when(context.createBundle(pcollection)).thenReturn(output);

    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
        WindowedValue.valueInGlobalWindow(
            UnboundedSourceShard.unstarted(source, NeverDeduplicator.create()));
    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle =
        bundleFactory
            .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
            .add(shard)
            .commit(Instant.now());
    UnboundedReadEvaluatorFactory factory =
        new UnboundedReadEvaluatorFactory(context, options, 1.0 /* Always reuse */);
    new UnboundedReadEvaluatorFactory.InputProvider(context, options)
        .getInitialInputs(sourceTransform, 1);

    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual = inputBundle;

    do {
      TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
          factory.forApplication(sourceTransform, residual);
      evaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
      TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
          evaluator.finishBundle();
      residual =
          inputBundle.withElements(
              (Iterable<WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>>)
                  result.getUnprocessedElements());
    } while (!Iterables.isEmpty(residual.getElements()));

    verify(output, times(numElements)).add(any());
    assertThat(TestUnboundedSource.readerCreatedCount, equalTo(1));
    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
  }

  @Test
  public void evaluatorClosesReaderAndResumesFromCheckpoint() throws Exception {
    ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs());
    TestUnboundedSource<Long> source =
        new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));

    PCollection<Long> pcollection = p.apply(Read.from(source));
    AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getGraph(p).getProducer(pcollection);

    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
    when(context.createBundle(pcollection)).thenReturn(output);

    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
        WindowedValue.valueInGlobalWindow(
            UnboundedSourceShard.unstarted(source, NeverDeduplicator.create()));
    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle =
        bundleFactory
            .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
            .add(shard)
            .commit(Instant.now());
    UnboundedReadEvaluatorFactory factory =
        new UnboundedReadEvaluatorFactory(context, options, 0.0 /* never reuse */);
    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
        factory.forApplication(sourceTransform, inputBundle);
    evaluator.processElement(shard);
    TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
        evaluator.finishBundle();

    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
        inputBundle.withElements(
            (Iterable<WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>>)
                result.getUnprocessedElements());

    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> secondEvaluator =
        factory.forApplication(sourceTransform, residual);
    secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
    secondEvaluator.finishBundle();

    assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
    assertThat(
        Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(),
        is(true));
  }

  @Test
  public void evaluatorThrowsInCloseRethrows() throws Exception {
    ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs());
    TestUnboundedSource<Long> source =
        new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]))
            .throwsOnClose();

    PCollection<Long> pcollection = p.apply(Read.from(source));
    AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getGraph(p).getProducer(pcollection);

    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
    when(context.createBundle(pcollection)).thenReturn(output);

    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
        WindowedValue.valueInGlobalWindow(
            UnboundedSourceShard.unstarted(source, NeverDeduplicator.create()));
    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle =
        bundleFactory
            .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
            .add(shard)
            .commit(Instant.now());
    UnboundedReadEvaluatorFactory factory =
        new UnboundedReadEvaluatorFactory(context, options, 0.0 /* never reuse */);
    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
        factory.forApplication(sourceTransform, inputBundle);
    thrown.expect(IOException.class);
    thrown.expectMessage("throws on close");
    evaluator.processElement(shard);
  }

  @Test // before this was throwing a NPE
  public void emptySource() throws Exception {
    TestUnboundedSource.readerClosedCount = 0;
    final TestUnboundedSource<String> source = new TestUnboundedSource<>(StringUtf8Coder.of());
    source.advanceWatermarkToInfinity = true;
    processElement(source);
    assertEquals(1, TestUnboundedSource.readerClosedCount);
    TestUnboundedSource.readerClosedCount = 0; // reset
  }

  @Test(expected = IOException.class)
  public void sourceThrowingException() throws Exception {
    final TestUnboundedSource<String> source = new TestUnboundedSource<>(StringUtf8Coder.of());
    source.advanceWatermarkToInfinity = true;
    source.throwOnClose = true;
    processElement(source);
  }

  private void processElement(final TestUnboundedSource<String> source) throws Exception {
    final EvaluationContext context =
        EvaluationContext.create(
            MockClock.fromInstant(Instant.now()),
            CloningBundleFactory.create(),
            DirectGraph.create(
                emptyMap(), emptyMap(), LinkedListMultimap.create(), emptySet(), emptyMap()),
            emptySet(),
            Executors.newCachedThreadPool());
    final UnboundedReadEvaluatorFactory factory =
        new UnboundedReadEvaluatorFactory(context, options);

    final Read.Unbounded<String> unbounded = Read.from(source);
    final Pipeline pipeline = Pipeline.create(options);
    final PCollection<String> pCollection = pipeline.apply(unbounded);
    final AppliedPTransform<PBegin, PCollection<String>, Read.Unbounded<String>> application =
        AppliedPTransform.of(
            "test",
            new HashMap<>(),
            singletonMap(new TupleTag(), pCollection),
            unbounded,
            pipeline);
    final TransformEvaluator<UnboundedSourceShard<String, TestCheckpointMark>> evaluator =
        factory.forApplication(application, null);
    final UnboundedSource.UnboundedReader<String> reader = source.createReader(options, null);
    final UnboundedSourceShard<String, TestCheckpointMark> shard =
        UnboundedSourceShard.of(source, new NeverDeduplicator(), reader, null);
    final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> value =
        WindowedValue.of(
            shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
    TestUnboundedSource.readerClosedCount = 0;
    evaluator.processElement(value);
  }

  /**
   * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where the timestamp
   * is the epoch offset by the value of the element.
   */
  private static WindowedValue<Long> tgw(Long elem) {
    return WindowedValue.timestampedValueInGlobalWindow(elem, new Instant(elem));
  }

  private static class LongToInstantFn implements SerializableFunction<Long, Instant> {
    @Override
    public Instant apply(Long input) {
      return new Instant(input);
    }
  }

  private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
    private static int getWatermarkCalls = 0;

    static int readerCreatedCount;
    static int readerClosedCount;
    static int readerAdvancedCount;
    private final Coder<T> coder;
    private final List<T> elems;
    private boolean dedupes = false;
    private boolean advanceWatermarkToInfinity = false; // After reaching end of input.
    private boolean throwOnClose;

    public TestUnboundedSource(Coder<T> coder, T... elems) {
      this(coder, false, Arrays.asList(elems));
    }

    private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> elems) {
      readerCreatedCount = 0;
      readerClosedCount = 0;
      readerAdvancedCount = 0;
      this.coder = coder;
      this.elems = elems;
      this.throwOnClose = throwOnClose;
    }

    @Override
    public List<? extends UnboundedSource<T, TestCheckpointMark>> split(
        int desiredNumSplits, PipelineOptions options) throws Exception {
      return ImmutableList.of(this);
    }

    @Override
    public UnboundedSource.UnboundedReader<T> createReader(
        PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
      checkState(
          checkpointMark == null || checkpointMark.decoded,
          "Cannot resume from a checkpoint that has not been decoded");
      readerCreatedCount++;
      return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
    }

    @Override
    @Nullable
    public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
      return new TestCheckpointMark.Coder();
    }

    @Override
    public boolean requiresDeduping() {
      return dedupes;
    }

    @Override
    public Coder<T> getOutputCoder() {
      return coder;
    }

    public TestUnboundedSource<T> throwsOnClose() {
      return new TestUnboundedSource<>(coder, true, elems);
    }

    private class TestUnboundedReader extends UnboundedReader<T> {
      private final List<T> elems;
      private int index;
      private boolean closed = false;

      public TestUnboundedReader(List<T> elems, int startIndex) {
        this.elems = elems;
        this.index = startIndex;
      }

      @Override
      public boolean start() throws IOException {
        return advance();
      }

      @Override
      public boolean advance() throws IOException {
        readerAdvancedCount++;
        if (index + 1 < elems.size()) {
          index++;
          return true;
        }
        return false;
      }

      @Override
      public Instant getWatermark() {
        getWatermarkCalls++;
        if (index + 1 == elems.size() && TestUnboundedSource.this.advanceWatermarkToInfinity) {
          return BoundedWindow.TIMESTAMP_MAX_VALUE;
        } else {
          return new Instant(index + getWatermarkCalls);
        }
      }

      @Override
      public CheckpointMark getCheckpointMark() {
        return new TestCheckpointMark(index);
      }

      @Override
      public UnboundedSource<T, ?> getCurrentSource() {
        return TestUnboundedSource.this;
      }

      @Override
      public T getCurrent() throws NoSuchElementException {
        return elems.get(index);
      }

      @Override
      public Instant getCurrentTimestamp() throws NoSuchElementException {
        return new Instant(index);
      }

      @Override
      public byte[] getCurrentRecordId() {
        try {
          return CoderUtils.encodeToByteArray(coder, getCurrent());
        } catch (CoderException e) {
          throw new RuntimeException(e);
        }
      }

      @Override
      public void close() throws IOException {
        try {
          readerClosedCount++;
          // Enforce the AutoCloseable contract. Close is not idempotent.
          assertThat(closed, is(false));
          if (throwOnClose) {
            throw new IOException(String.format("%s throws on close", TestUnboundedSource.this));
          }
        } finally {
          closed = true;
        }
      }
    }
  }

  private static class TestCheckpointMark implements CheckpointMark {
    final int index;
    private boolean finalized = false;
    private boolean decoded = false;

    private TestCheckpointMark(int index) {
      this.index = index;
    }

    @Override
    public void finalizeCheckpoint() throws IOException {
      checkState(
          !finalized, "%s was finalized more than once", TestCheckpointMark.class.getSimpleName());
      checkState(
          !decoded,
          "%s was finalized after being decoded",
          TestCheckpointMark.class.getSimpleName());
      finalized = true;
    }

    boolean isFinalized() {
      return finalized;
    }

    public static class Coder extends AtomicCoder<TestCheckpointMark> {
      @Override
      public void encode(TestCheckpointMark value, OutputStream outStream) throws IOException {
        VarInt.encode(value.index, outStream);
      }

      @Override
      public TestCheckpointMark decode(InputStream inStream) throws IOException {
        TestCheckpointMark decoded = new TestCheckpointMark(VarInt.decodeInt(inStream));
        decoded.decoded = true;
        return decoded;
      }
    }
  }
}
