/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.testing;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.Instant;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Helper functions and test harnesses for checking correctness of {@link Source} implementations.
 *
 * <p>Contains a few lightweight utilities (e.g. reading items from a source or a reader, such as
 * {@link #readFromSource} and {@link #readFromUnstartedReader}), as well as heavyweight property
 * testing and stress testing harnesses that help getting a large amount of test coverage with few
 * code. Most notable ones are:
 *
 * <ul>
 *   <li>{@link #assertSourcesEqualReferenceSource} helps testing that the data read by the union of
 *       sources produced by {@link BoundedSource#split} is the same as data read by the original
 *       source.
 *   <li>If your source implements dynamic work rebalancing, use the {@code assertSplitAtFraction}
 *       family of functions - they test behavior of {@link
 *       BoundedSource.BoundedReader#splitAtFraction}, in particular, that various consistency
 *       properties are respected and the total set of data read by the source is preserved when
 *       splits happen. Use {@link #assertSplitAtFractionBehavior} to test individual cases of
 *       {@code splitAtFraction} and use {@link #assertSplitAtFractionExhaustive} as a heavy-weight
 *       stress test including concurrency. We strongly recommend to use both.
 * </ul>
 *
 * For example usages, see the unit tests of classes such as {@code AvroSource} or {@code
 * TextSource}.
 *
 * <p>Like {@link PAssert}, requires JUnit and Hamcrest to be present in the classpath.
 */
public class SourceTestUtils {
  private static final Logger LOG = LoggerFactory.getLogger(SourceTestUtils.class);

  // A wrapper around a value of type T that compares according to the structural
  // value provided by a Coder<T>, but prints both the original and structural value,
  // to help get good error messages from JUnit equality assertion failures and such.
  private static class ReadableStructuralValue<T> {
    private T originalValue;
    private Object structuralValue;

    public ReadableStructuralValue(T originalValue, Object structuralValue) {
      this.originalValue = originalValue;
      this.structuralValue = structuralValue;
    }

    @Override
    public int hashCode() {
      return Objects.hashCode(structuralValue);
    }

    @Override
    public boolean equals(Object obj) {
      if (obj == null || !(obj instanceof ReadableStructuralValue)) {
        return false;
      }
      return Objects.equals(structuralValue, ((ReadableStructuralValue) obj).structuralValue);
    }

    @Override
    public String toString() {
      return String.format("[%s (structural %s)]", originalValue, structuralValue);
    }
  }

  /**
   * Testing utilities below depend on standard assertions and matchers to compare elements read by
   * sources. In general the elements may not implement {@code equals}/{@code hashCode} properly,
   * however every source has a {@link Coder} and every {@code Coder} can produce a {@link
   * Coder#structuralValue} whose {@code equals}/{@code hashCode} is consistent with equality of
   * encoded format. So we use this {@link Coder#structuralValue} to compare elements read by
   * sources.
   */
  public static <T> List<ReadableStructuralValue<T>> createStructuralValues(
      Coder<T> coder, List<T> list) throws Exception {
    List<ReadableStructuralValue<T>> result = new ArrayList<>();
    for (T elem : list) {
      result.add(new ReadableStructuralValue<>(elem, coder.structuralValue(elem)));
    }
    return result;
  }

  /** Reads all elements from the given {@link BoundedSource}. */
  public static <T> List<T> readFromSource(BoundedSource<T> source, PipelineOptions options)
      throws IOException {
    try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {
      return readFromUnstartedReader(reader);
    }
  }

  public static <T> List<T> readFromSplitsOfSource(
      BoundedSource<T> source, long desiredBundleSizeBytes, PipelineOptions options)
      throws Exception {
    List<T> res = Lists.newArrayList();
    for (BoundedSource<T> split : source.split(desiredBundleSizeBytes, options)) {
      res.addAll(readFromSource(split, options));
    }
    return res;
  }

  /** Reads all elements from the given unstarted {@link Source.Reader}. */
  public static <T> List<T> readFromUnstartedReader(Source.Reader<T> reader) throws IOException {
    return readRemainingFromReader(reader, false);
  }

  /** Reads all elements from the given started {@link Source.Reader}. */
  public static <T> List<T> readFromStartedReader(Source.Reader<T> reader) throws IOException {
    return readRemainingFromReader(reader, true);
  }

  /** Read elements from a {@link Source.Reader} until n elements are read. */
  public static <T> List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n)
      throws IOException {
    return readNItemsFromReader(reader, n, false);
  }

  /**
   * Read elements from a {@link Source.Reader} that has already had {@link Source.Reader#start}
   * called on it, until n elements are read.
   */
  public static <T> List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n)
      throws IOException {
    return readNItemsFromReader(reader, n, true);
  }

  /**
   * Read elements from a {@link Source.Reader} until n elements are read.
   *
   * <p>There must be at least n elements remaining in the reader, except for the case when n is
   * {@code Integer.MAX_VALUE}, which means "read all remaining elements".
   */
  private static <T> List<T> readNItemsFromReader(Source.Reader<T> reader, int n, boolean started)
      throws IOException {
    List<T> res = new ArrayList<>();
    for (int i = 0; i < n; i++) {
      boolean shouldStart = (i == 0 && !started);
      boolean more = shouldStart ? reader.start() : reader.advance();
      if (n != Integer.MAX_VALUE) {
        assertTrue(more);
      }
      if (!more) {
        break;
      }
      res.add(reader.getCurrent());
    }
    return res;
  }

  /** Read all remaining elements from a {@link Source.Reader}. */
  public static <T> List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started)
      throws IOException {
    return readNItemsFromReader(reader, Integer.MAX_VALUE, started);
  }

  /**
   * Given a reference {@code Source} and a list of {@code Source}s, assert that the union of the
   * records read from the list of sources is equal to the records read from the reference source.
   */
  public static <T> void assertSourcesEqualReferenceSource(
      BoundedSource<T> referenceSource,
      List<? extends BoundedSource<T>> sources,
      PipelineOptions options)
      throws Exception {
    Coder<T> coder = referenceSource.getOutputCoder();
    List<T> referenceRecords = readFromSource(referenceSource, options);
    List<T> bundleRecords = new ArrayList<>();
    for (BoundedSource<T> source : sources) {
      assertThat(
          "Coder type for source "
              + source
              + " is not compatible with Coder type for referenceSource "
              + referenceSource,
          source.getOutputCoder(),
          equalTo(coder));
      List<T> elems = readFromSource(source, options);
      bundleRecords.addAll(elems);
    }
    List<ReadableStructuralValue<T>> bundleValues = createStructuralValues(coder, bundleRecords);
    List<ReadableStructuralValue<T>> referenceValues =
        createStructuralValues(coder, referenceRecords);
    assertThat(bundleValues, containsInAnyOrder(referenceValues.toArray()));
  }

  /**
   * Assert that a {@code Reader} returns a {@code Source} that, when read from, produces the same
   * records as the reader.
   */
  public static <T> void assertUnstartedReaderReadsSameAsItsSource(
      BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {
    Coder<T> coder = reader.getCurrentSource().getOutputCoder();
    List<T> expected = readFromUnstartedReader(reader);
    List<T> actual = readFromSource(reader.getCurrentSource(), options);
    List<ReadableStructuralValue<T>> expectedStructural = createStructuralValues(coder, expected);
    List<ReadableStructuralValue<T>> actualStructural = createStructuralValues(coder, actual);
    assertThat(actualStructural, containsInAnyOrder(expectedStructural.toArray()));
  }

  /**
   * Expected outcome of {@link org.apache.beam.sdk.io.BoundedSource.BoundedReader#splitAtFraction}.
   */
  public enum ExpectedSplitOutcome {
    /** The operation must succeed and the results must be consistent. */
    MUST_SUCCEED_AND_BE_CONSISTENT,
    /** The operation must fail (return {@code null}). */
    MUST_FAIL,
    /** The operation must either fail, or succeed and the results be consistent. */
    MUST_BE_CONSISTENT_IF_SUCCEEDS
  }

  /**
   * Contains two values: the number of items in the primary source, and the number of items in the
   * residual source, -1 if split failed.
   */
  private static class SplitAtFractionResult {
    public int numPrimaryItems;
    public int numResidualItems;

    public SplitAtFractionResult(int numPrimaryItems, int numResidualItems) {
      this.numPrimaryItems = numPrimaryItems;
      this.numResidualItems = numResidualItems;
    }
  }

  /**
   * Asserts that the {@code source}'s reader either fails to {@code splitAtFraction(fraction)}
   * after reading {@code numItemsToReadBeforeSplit} items, or succeeds in a way that is consistent
   * according to {@link #assertSplitAtFractionSucceedsAndConsistent}.
   *
   * <p>Returns SplitAtFractionResult.
   */
  public static <T> SplitAtFractionResult assertSplitAtFractionBehavior(
      BoundedSource<T> source,
      int numItemsToReadBeforeSplit,
      double splitFraction,
      ExpectedSplitOutcome expectedOutcome,
      PipelineOptions options)
      throws Exception {
    return assertSplitAtFractionBehaviorImpl(
        source,
        readFromSource(source, options),
        numItemsToReadBeforeSplit,
        splitFraction,
        expectedOutcome,
        options);
  }

  /**
   * Compares two lists elementwise and throws a detailed assertion failure optimized for human
   * reading in case they are unequal.
   */
  private static <T> void assertListsEqualInOrder(
      String message, String expectedLabel, List<T> expected, String actualLabel, List<T> actual) {
    int i = 0;
    for (; i < expected.size() && i < actual.size(); ++i) {
      if (!Objects.equals(expected.get(i), actual.get(i))) {
        Assert.fail(
            String.format(
                "%s: %s and %s have %d items in common and then differ. "
                    + "Item in %s (%d more): %s, item in %s (%d more): %s",
                message,
                expectedLabel,
                actualLabel,
                i,
                expectedLabel,
                expected.size() - i - 1,
                expected.get(i),
                actualLabel,
                actual.size() - i - 1,
                actual.get(i)));
      }
    }
    if (i < expected.size() /* but i == actual.size() */) {
      Assert.fail(
          String.format(
              "%s: %s has %d more items after matching all %d from %s. First 5: %s",
              message,
              expectedLabel,
              expected.size() - actual.size(),
              actual.size(),
              actualLabel,
              expected.subList(actual.size(), Math.min(expected.size(), actual.size() + 5))));
    } else if (i < actual.size() /* but i == expected.size() */) {
      Assert.fail(
          String.format(
              "%s: %s has %d more items after matching all %d from %s. First 5: %s",
              message,
              actualLabel,
              actual.size() - expected.size(),
              expected.size(),
              expectedLabel,
              actual.subList(expected.size(), Math.min(actual.size(), expected.size() + 5))));
    } else {
      // All is well.
    }
  }

  private static <T> SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehaviorImpl(
      BoundedSource<T> source,
      List<T> expectedItems,
      int numItemsToReadBeforeSplit,
      double splitFraction,
      ExpectedSplitOutcome expectedOutcome,
      PipelineOptions options)
      throws Exception {
    try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {
      BoundedSource<T> originalSource = reader.getCurrentSource();
      List<T> currentItems = readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplit);
      BoundedSource<T> residual = reader.splitAtFraction(splitFraction);
      if (residual != null) {
        assertFalse(
            String.format(
                "Primary source didn't change after a successful split of %s at %f "
                    + "after reading %d items. "
                    + "Was the source object mutated instead of creating a new one? "
                    + "Source objects MUST be immutable.",
                source, splitFraction, numItemsToReadBeforeSplit),
            reader.getCurrentSource() == originalSource);
        assertFalse(
            String.format(
                "Residual source equal to original source after a successful split of %s at %f "
                    + "after reading %d items. "
                    + "Was the source object mutated instead of creating a new one? "
                    + "Source objects MUST be immutable.",
                source, splitFraction, numItemsToReadBeforeSplit),
            reader.getCurrentSource() == residual);
      }
      // Failure cases are: must succeed but fails; must fail but succeeds.
      switch (expectedOutcome) {
        case MUST_SUCCEED_AND_BE_CONSISTENT:
          assertNotNull(
              "Failed to split reader of source: "
                  + source
                  + " at "
                  + splitFraction
                  + " after reading "
                  + numItemsToReadBeforeSplit
                  + " items",
              residual);
          break;
        case MUST_FAIL:
          assertEquals(null, residual);
          break;
        case MUST_BE_CONSISTENT_IF_SUCCEEDS:
          // Nothing.
          break;
      }
      currentItems.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplit > 0));
      BoundedSource<T> primary = reader.getCurrentSource();
      return verifySingleSplitAtFractionResult(
          source,
          expectedItems,
          currentItems,
          primary,
          residual,
          numItemsToReadBeforeSplit,
          splitFraction,
          options);
    }
  }

  private static <T> SourceTestUtils.SplitAtFractionResult verifySingleSplitAtFractionResult(
      BoundedSource<T> source,
      List<T> expectedItems,
      List<T> currentItems,
      BoundedSource<T> primary,
      BoundedSource<T> residual,
      int numItemsToReadBeforeSplit,
      double splitFraction,
      PipelineOptions options)
      throws Exception {
    List<T> primaryItems = readFromSource(primary, options);
    if (residual != null) {
      List<T> residualItems = readFromSource(residual, options);
      List<T> totalItems = new ArrayList<>();
      totalItems.addAll(primaryItems);
      totalItems.addAll(residualItems);
      String errorMsgForPrimarySourceComp =
          String.format(
              "Continued reading after split yielded different items than primary source: "
                  + "split at %s after reading %s items, original source: %s, primary source: %s",
              splitFraction, numItemsToReadBeforeSplit, source, primary);
      String errorMsgForTotalSourceComp =
          String.format(
              "Items in primary and residual sources after split do not add up to items "
                  + "in the original source. Split at %s after reading %s items; "
                  + "original source: %s, primary: %s, residual: %s",
              splitFraction, numItemsToReadBeforeSplit, source, primary, residual);
      Coder<T> coder = primary.getOutputCoder();
      List<ReadableStructuralValue<T>> primaryValues = createStructuralValues(coder, primaryItems);
      List<ReadableStructuralValue<T>> currentValues = createStructuralValues(coder, currentItems);
      List<ReadableStructuralValue<T>> expectedValues =
          createStructuralValues(coder, expectedItems);
      List<ReadableStructuralValue<T>> totalValues = createStructuralValues(coder, totalItems);
      assertListsEqualInOrder(
          errorMsgForPrimarySourceComp, "current", currentValues, "primary", primaryValues);
      assertListsEqualInOrder(
          errorMsgForTotalSourceComp, "total", expectedValues, "primary+residual", totalValues);
      return new SplitAtFractionResult(primaryItems.size(), residualItems.size());
    }
    return new SplitAtFractionResult(primaryItems.size(), -1);
  }

  /**
   * Verifies some consistency properties of {@link BoundedSource.BoundedReader#splitAtFraction} on
   * the given source. Equivalent to the following pseudocode:
   *
   * <pre>
   *   Reader reader = source.createReader();
   *   read N items from reader;
   *   Source residual = reader.splitAtFraction(splitFraction);
   *   Source primary = reader.getCurrentSource();
   *   assert: items in primary == items we read so far
   *                               + items we'll get by continuing to read from reader;
   *   assert: items in original source == items in primary + items in residual
   * </pre>
   */
  public static <T> void assertSplitAtFractionSucceedsAndConsistent(
      BoundedSource<T> source,
      int numItemsToReadBeforeSplit,
      double splitFraction,
      PipelineOptions options)
      throws Exception {
    assertSplitAtFractionBehavior(
        source,
        numItemsToReadBeforeSplit,
        splitFraction,
        ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT,
        options);
  }

  /**
   * Asserts that the {@code source}'s reader fails to {@code splitAtFraction(fraction)} after
   * reading {@code numItemsToReadBeforeSplit} items.
   */
  public static <T> void assertSplitAtFractionFails(
      BoundedSource<T> source,
      int numItemsToReadBeforeSplit,
      double splitFraction,
      PipelineOptions options)
      throws Exception {
    assertSplitAtFractionBehavior(
        source, numItemsToReadBeforeSplit, splitFraction, ExpectedSplitOutcome.MUST_FAIL, options);
  }

  private static class SplitFractionStatistics {
    List<Double> successfulFractions = new ArrayList<>();
    List<Double> nonTrivialFractions = new ArrayList<>();
  }

  /**
   * Asserts that given a start position, {@link BoundedSource.BoundedReader#splitAtFraction} at
   * every interesting fraction (halfway between two fractions that differ by at least one item) can
   * be called successfully and the results are consistent if a split succeeds.
   */
  private static <T> void assertSplitAtFractionBinary(
      BoundedSource<T> source,
      List<T> expectedItems,
      int numItemsToBeReadBeforeSplit,
      double leftFraction,
      SplitAtFractionResult leftResult,
      double rightFraction,
      SplitAtFractionResult rightResult,
      PipelineOptions options,
      SplitFractionStatistics stats)
      throws Exception {
    if (rightFraction - leftFraction < 0.001) {
      // Do not recurse too deeply. Otherwise we will end up in infinite
      // recursion, e.g., while trying to find the exact minimal fraction s.t.
      // split succeeds. A precision of 0.001 when looking for such a fraction
      // ought to be enough for everybody.
      return;
    }
    double middleFraction = (rightFraction + leftFraction) / 2;
    if (leftResult == null) {
      leftResult =
          assertSplitAtFractionBehaviorImpl(
              source,
              expectedItems,
              numItemsToBeReadBeforeSplit,
              leftFraction,
              ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS,
              options);
    }
    if (rightResult == null) {
      rightResult =
          assertSplitAtFractionBehaviorImpl(
              source,
              expectedItems,
              numItemsToBeReadBeforeSplit,
              rightFraction,
              ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS,
              options);
    }
    SplitAtFractionResult middleResult =
        assertSplitAtFractionBehaviorImpl(
            source,
            expectedItems,
            numItemsToBeReadBeforeSplit,
            middleFraction,
            ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS,
            options);
    if (middleResult.numResidualItems != -1) {
      stats.successfulFractions.add(middleFraction);
    }
    if (middleResult.numResidualItems > 0) {
      stats.nonTrivialFractions.add(middleFraction);
    }
    // Two split fractions are equivalent if they yield the same number of
    // items in primary vs. residual source. Left and right are already not
    // equivalent. Recurse into [left, middle) and [right, middle) respectively
    // if middle is not equivalent to left or right.
    if (leftResult.numPrimaryItems != middleResult.numPrimaryItems) {
      assertSplitAtFractionBinary(
          source,
          expectedItems,
          numItemsToBeReadBeforeSplit,
          leftFraction,
          leftResult,
          middleFraction,
          middleResult,
          options,
          stats);
    }
    if (rightResult.numPrimaryItems != middleResult.numPrimaryItems) {
      assertSplitAtFractionBinary(
          source,
          expectedItems,
          numItemsToBeReadBeforeSplit,
          middleFraction,
          middleResult,
          rightFraction,
          rightResult,
          options,
          stats);
    }
  }

  private static final int MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM = 100;
  private static final int MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL = 1000;

  /**
   * Asserts that for each possible start position, {@link
   * BoundedSource.BoundedReader#splitAtFraction} at every interesting fraction (halfway between two
   * fractions that differ by at least one item) can be called successfully and the results are
   * consistent if a split succeeds. Verifies multithreaded splitting as well.
   */
  public static <T> void assertSplitAtFractionExhaustive(
      BoundedSource<T> source, PipelineOptions options) throws Exception {
    List<T> expectedItems = readFromSource(source, options);
    assertFalse("Empty source", expectedItems.isEmpty());
    assertFalse("Source reads a single item", expectedItems.size() == 1);
    List<List<Double>> allNonTrivialFractions = new ArrayList<>();
    {
      boolean anySuccessfulFractions = false;
      boolean anyNonTrivialFractions = false;
      for (int i = 0; i < expectedItems.size(); i++) {
        SplitFractionStatistics stats = new SplitFractionStatistics();
        assertSplitAtFractionBinary(source, expectedItems, i, 0.0, null, 1.0, null, options, stats);
        if (!stats.successfulFractions.isEmpty()) {
          anySuccessfulFractions = true;
        }
        if (!stats.nonTrivialFractions.isEmpty()) {
          anyNonTrivialFractions = true;
        }
        allNonTrivialFractions.add(stats.nonTrivialFractions);
      }
      assertTrue(
          "splitAtFraction test completed vacuously: no successful split fractions found",
          anySuccessfulFractions);
      assertTrue(
          "splitAtFraction test completed vacuously: no non-trivial split fractions found",
          anyNonTrivialFractions);
    }
    {
      // Perform a stress test of "racy" concurrent splitting:
      // for every position (number of items read), try to split at the minimum nontrivial
      // split fraction for that position concurrently with reading the record at that position.
      // To ensure that the test is non-vacuous, make sure that the splitting succeeds
      // at least once and fails at least once.
      ExecutorService executor = Executors.newFixedThreadPool(2);
      int numTotalTrials = 0;
      for (int i = 0; i < expectedItems.size(); i++) {
        double minNonTrivialFraction = 2.0; // Greater than any possible fraction.
        for (double fraction : allNonTrivialFractions.get(i)) {
          minNonTrivialFraction = Math.min(minNonTrivialFraction, fraction);
        }
        if (minNonTrivialFraction == 2.0) {
          // This will not happen all the time because otherwise the test above would
          // detect vacuousness.
          continue;
        }
        int numTrials = 0;
        boolean haveSuccess = false, haveFailure = false;
        while (true) {
          ++numTrials;
          if (numTrials > MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM) {
            LOG.warn(
                "After {} concurrent splitting trials at item #{}, observed only {}, "
                    + "giving up on this item",
                numTrials,
                i,
                haveSuccess ? "success" : "failure");
            break;
          }
          if (assertSplitAtFractionConcurrent(
              executor, source, expectedItems, i, minNonTrivialFraction, options)) {
            haveSuccess = true;
          } else {
            haveFailure = true;
          }
          if (haveSuccess && haveFailure) {
            LOG.info(
                "{} trials to observe both success and failure of concurrent splitting at item #{}",
                numTrials,
                i);
            break;
          }
        }
        numTotalTrials += numTrials;
        if (numTotalTrials > MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL) {
          LOG.warn(
              "After {} total concurrent splitting trials, considered only {} items, giving up.",
              numTotalTrials,
              i);
          break;
        }
      }
      LOG.info(
          "{} total concurrent splitting trials for {} items",
          numTotalTrials,
          expectedItems.size());
    }
  }

  private static <T> boolean assertSplitAtFractionConcurrent(
      ExecutorService executor,
      BoundedSource<T> source,
      List<T> expectedItems,
      final int numItemsToReadBeforeSplitting,
      final double fraction,
      PipelineOptions options)
      throws Exception {
    @SuppressWarnings("resource") // Closed in readerThread
    final BoundedSource.BoundedReader<T> reader = source.createReader(options);
    final CountDownLatch unblockSplitter = new CountDownLatch(1);
    Future<List<T>> readerThread =
        executor.submit(
            () -> {
              try {
                List<T> items =
                    readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplitting);
                unblockSplitter.countDown();
                items.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplitting > 0));
                return items;
              } finally {
                reader.close();
              }
            });
    Future<KV<BoundedSource<T>, BoundedSource<T>>> splitterThread =
        executor.submit(
            () -> {
              unblockSplitter.await();
              BoundedSource<T> residual = reader.splitAtFraction(fraction);
              if (residual == null) {
                return null;
              }
              return KV.of(reader.getCurrentSource(), residual);
            });
    List<T> currentItems = readerThread.get();
    KV<BoundedSource<T>, BoundedSource<T>> splitSources = splitterThread.get();
    if (splitSources == null) {
      return false;
    }
    SplitAtFractionResult res =
        verifySingleSplitAtFractionResult(
            source,
            expectedItems,
            currentItems,
            splitSources.getKey(),
            splitSources.getValue(),
            numItemsToReadBeforeSplitting,
            fraction,
            options);
    return (res.numResidualItems > 0);
  }

  /**
   * Returns an equivalent unsplittable {@code BoundedSource<T>}.
   *
   * <p>It forwards most methods to the given {@code boundedSource}, except:
   *
   * <ol>
   *   <li>{@link BoundedSource#split} rejects initial splitting by returning itself in a list.
   *   <li>{@link BoundedReader#splitAtFraction} rejects dynamic splitting by returning null.
   * </ol>
   */
  public static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource) {
    return new UnsplittableSource<>(boundedSource);
  }

  private static class UnsplittableSource<T> extends BoundedSource<T> {

    private final BoundedSource<T> boundedSource;

    private UnsplittableSource(BoundedSource<T> boundedSource) {
      this.boundedSource = checkNotNull(boundedSource, "boundedSource");
    }

    @Override
    public void populateDisplayData(DisplayData.Builder builder) {
      this.boundedSource.populateDisplayData(builder);
    }

    @Override
    public List<? extends BoundedSource<T>> split(
        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
      return ImmutableList.of(this);
    }

    @Override
    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
      return boundedSource.getEstimatedSizeBytes(options);
    }

    @Override
    public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
      return new UnsplittableReader<>(boundedSource, boundedSource.createReader(options));
    }

    @Override
    public void validate() {
      boundedSource.validate();
    }

    @Override
    public Coder<T> getOutputCoder() {
      return boundedSource.getOutputCoder();
    }

    private static class UnsplittableReader<T> extends BoundedReader<T> {

      private final BoundedSource<T> boundedSource;
      private final BoundedReader<T> boundedReader;

      private UnsplittableReader(BoundedSource<T> boundedSource, BoundedReader<T> boundedReader) {
        this.boundedSource = checkNotNull(boundedSource, "boundedSource");
        this.boundedReader = checkNotNull(boundedReader, "boundedReader");
      }

      @Override
      public BoundedSource<T> getCurrentSource() {
        return boundedSource;
      }

      @Override
      public boolean start() throws IOException {
        return boundedReader.start();
      }

      @Override
      public boolean advance() throws IOException {
        return boundedReader.advance();
      }

      @Override
      public T getCurrent() throws NoSuchElementException {
        return boundedReader.getCurrent();
      }

      @Override
      public void close() throws IOException {
        boundedReader.close();
      }

      @Override
      @Nullable
      public BoundedSource<T> splitAtFraction(double fraction) {
        return null;
      }

      @Override
      @Nullable
      public Double getFractionConsumed() {
        return boundedReader.getFractionConsumed();
      }

      @Override
      public long getSplitPointsConsumed() {
        return boundedReader.getSplitPointsConsumed();
      }

      @Override
      public long getSplitPointsRemaining() {
        return boundedReader.getSplitPointsRemaining();
      }

      @Override
      public Instant getCurrentTimestamp() throws NoSuchElementException {
        return boundedReader.getCurrentTimestamp();
      }
    }
  }
}
