/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.direct;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

/** Tests for {@link BoundedReadEvaluatorFactory}. */
@RunWith(JUnit4.class)
public class BoundedReadEvaluatorFactoryTest {
  private BoundedSource<Long> source;
  private PCollection<Long> longs;
  private BoundedReadEvaluatorFactory factory;
  @Mock private EvaluationContext context;
  private BundleFactory bundleFactory;
  private AppliedPTransform<?, ?, ?> longsProducer;

  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
  private PipelineOptions options;

  @Before
  public void setup() {
    MockitoAnnotations.initMocks(this);
    source = CountingSource.upTo(10L);
    longs = p.apply(Read.from(source));

    options = PipelineOptionsFactory.create();
    factory =
        new BoundedReadEvaluatorFactory(
            context, options, Long.MAX_VALUE /* minimum size for dynamic splits */);
    bundleFactory = ImmutableListBundleFactory.create();
    longsProducer = DirectGraphs.getProducer(longs);
  }

  @Test
  public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
    UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(longs);
    when(context.createBundle(longs)).thenReturn(outputBundle);

    Collection<CommittedBundle<?>> initialInputs =
        new BoundedReadEvaluatorFactory.InputProvider(context, options)
            .getInitialInputs(longsProducer, 1);
    List<WindowedValue<?>> outputs = new ArrayList<>();
    for (CommittedBundle<?> shardBundle : initialInputs) {
      TransformEvaluator<?> evaluator = factory.forApplication(longsProducer, null);
      for (WindowedValue<?> shard : shardBundle.getElements()) {
        evaluator.processElement((WindowedValue) shard);
      }
      TransformResult<?> result = evaluator.finishBundle();
      assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
      assertThat(
          Iterables.size(result.getOutputBundles()),
          equalTo(Iterables.size(shardBundle.getElements())));
      for (UncommittedBundle<?> output : result.getOutputBundles()) {
        CommittedBundle<?> committed = output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
        for (WindowedValue<?> val : committed.getElements()) {
          outputs.add(val);
        }
      }
    }

    assertThat(
        outputs,
        containsInAnyOrder(
            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
  }

  @Test
  public void boundedSourceEvaluatorProducesDynamicSplits() throws Exception {
    BoundedReadEvaluatorFactory factory = new BoundedReadEvaluatorFactory(context, options, 0L);

    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
    int numElements = 10;
    Long[] elems = new Long[numElements];
    for (int i = 0; i < numElements; i++) {
      elems[i] = (long) i;
    }
    PCollection<Long> read = p.apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems)));
    AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);
    Collection<CommittedBundle<?>> unreadInputs =
        new BoundedReadEvaluatorFactory.InputProvider(context, options)
            .getInitialInputs(transform, 1);

    Collection<WindowedValue<?>> outputs = new ArrayList<>();
    int numIterations = 0;
    while (!unreadInputs.isEmpty()) {
      numIterations++;
      UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(read);
      when(context.createBundle(read)).thenReturn(outputBundle);

      Collection<CommittedBundle<?>> newUnreadInputs = new ArrayList<>();
      for (CommittedBundle<?> shardBundle : unreadInputs) {
        TransformEvaluator<Long> evaluator = factory.forApplication(transform, null);
        for (WindowedValue<?> shard : shardBundle.getElements()) {
          evaluator.processElement((WindowedValue) shard);
        }
        TransformResult<Long> result = evaluator.finishBundle();
        assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
        assertThat(
            Iterables.size(result.getOutputBundles()),
            equalTo(Iterables.size(shardBundle.getElements())));
        for (UncommittedBundle<?> output : result.getOutputBundles()) {
          CommittedBundle<?> committed = output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
          for (WindowedValue<?> val : committed.getElements()) {
            outputs.add(val);
          }
        }
        if (!Iterables.isEmpty(result.getUnprocessedElements())) {
          newUnreadInputs.add(shardBundle.withElements((Iterable) result.getUnprocessedElements()));
        }
      }
      unreadInputs = newUnreadInputs;
    }

    assertThat(numIterations, greaterThan(1));
    WindowedValue[] expectedValues = new WindowedValue[numElements];
    for (long i = 0L; i < numElements; i++) {
      expectedValues[(int) i] = gw(i);
    }
    assertThat(outputs, Matchers.<WindowedValue<?>>containsInAnyOrder(expectedValues));
  }

  @Test
  public void boundedSourceEvaluatorDynamicSplitsUnsplittable() throws Exception {
    BoundedReadEvaluatorFactory factory = new BoundedReadEvaluatorFactory(context, options, 0L);

    PCollection<Long> read =
        p.apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
    AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);

    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
    Collection<CommittedBundle<?>> initialInputs =
        new BoundedReadEvaluatorFactory.InputProvider(context, options)
            .getInitialInputs(transform, 1);

    UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(read);
    when(context.createBundle(read)).thenReturn(outputBundle);
    List<WindowedValue<?>> outputs = new ArrayList<>();
    for (CommittedBundle<?> shardBundle : initialInputs) {
      TransformEvaluator<?> evaluator = factory.forApplication(transform, null);
      for (WindowedValue<?> shard : shardBundle.getElements()) {
        evaluator.processElement((WindowedValue) shard);
      }
      TransformResult<?> result = evaluator.finishBundle();
      assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
      assertThat(
          Iterables.size(result.getOutputBundles()),
          equalTo(Iterables.size(shardBundle.getElements())));
      for (UncommittedBundle<?> output : result.getOutputBundles()) {
        CommittedBundle<?> committed = output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
        for (WindowedValue<?> val : committed.getElements()) {
          outputs.add(val);
        }
      }
    }

    assertThat(
        outputs,
        containsInAnyOrder(
            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
  }

  @Test
  public void getInitialInputsSplitsIntoBundles() throws Exception {
    when(context.createRootBundle()).thenAnswer(invocation -> bundleFactory.createRootBundle());
    Collection<CommittedBundle<?>> initialInputs =
        new BoundedReadEvaluatorFactory.InputProvider(context, options)
            .getInitialInputs(longsProducer, 3);

    assertThat(initialInputs, hasSize(allOf(greaterThanOrEqualTo(3), lessThanOrEqualTo(4))));

    Collection<BoundedSource<Long>> sources = new ArrayList<>();
    for (CommittedBundle<?> initialInput : initialInputs) {
      Iterable<WindowedValue<BoundedSourceShard<Long>>> shards =
          (Iterable) initialInput.getElements();
      WindowedValue<BoundedSourceShard<Long>> shard = Iterables.getOnlyElement(shards);
      assertThat(shard.getWindows(), Matchers.contains(GlobalWindow.INSTANCE));
      assertThat(shard.getTimestamp(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
      sources.add(shard.getValue().getSource());
    }

    SourceTestUtils.assertSourcesEqualReferenceSource(
        source, (List<? extends BoundedSource<Long>>) sources, PipelineOptionsFactory.create());
  }

  @Test
  public void boundedSourceInMemoryTransformEvaluatorShardsOfSource() throws Exception {
    PipelineOptions options = PipelineOptionsFactory.create();
    List<? extends BoundedSource<Long>> splits =
        source.split(source.getEstimatedSizeBytes(options) / 2, options);

    UncommittedBundle<BoundedSourceShard<Long>> rootBundle = bundleFactory.createRootBundle();
    for (BoundedSource<Long> split : splits) {
      BoundedSourceShard<Long> shard = BoundedSourceShard.of(split);
      rootBundle.add(WindowedValue.valueInGlobalWindow(shard));
    }
    CommittedBundle<BoundedSourceShard<Long>> shards = rootBundle.commit(Instant.now());

    TransformEvaluator<BoundedSourceShard<Long>> evaluator =
        factory.forApplication(longsProducer, shards);
    for (WindowedValue<BoundedSourceShard<Long>> shard : shards.getElements()) {
      UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(longs);
      when(context.createBundle(longs)).thenReturn(outputBundle);
      evaluator.processElement(shard);
    }
    TransformResult<?> result = evaluator.finishBundle();
    assertThat(Iterables.size(result.getOutputBundles()), equalTo(splits.size()));

    List<WindowedValue<?>> outputElems = new ArrayList<>();
    for (UncommittedBundle<?> outputBundle : result.getOutputBundles()) {
      CommittedBundle<?> outputs = outputBundle.commit(Instant.now());
      for (WindowedValue<?> outputElem : outputs.getElements()) {
        outputElems.add(outputElem);
      }
    }
    assertThat(
        outputElems,
        containsInAnyOrder(
            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
  }

  @Test
  public void boundedSourceEvaluatorClosesReader() throws Exception {
    TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
    PCollection<Long> pcollection = p.apply(Read.from(source));
    AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);

    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
    when(context.createBundle(pcollection)).thenReturn(output);

    TransformEvaluator<BoundedSourceShard<Long>> evaluator =
        factory.forApplication(
            sourceTransform, bundleFactory.createRootBundle().commit(Instant.now()));
    evaluator.processElement(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)));
    evaluator.finishBundle();
    CommittedBundle<Long> committed = output.commit(Instant.now());
    assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
    assertThat(TestSource.readerClosed, is(true));
  }

  @Test
  public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
    TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of());

    PCollection<Long> pcollection = p.apply(Read.from(source));
    AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);

    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
    when(context.createBundle(pcollection)).thenReturn(output);

    TransformEvaluator<BoundedSourceShard<Long>> evaluator =
        factory.forApplication(
            sourceTransform, bundleFactory.createRootBundle().commit(Instant.now()));
    evaluator.processElement(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)));
    evaluator.finishBundle();
    CommittedBundle<Long> committed = output.commit(Instant.now());
    assertThat(committed.getElements(), emptyIterable());
    assertThat(TestSource.readerClosed, is(true));
  }

  @Test
  public void cleanupShutsDownExecutor() {
    factory.cleanup();
    assertThat(factory.executor.isShutdown(), is(true));
  }

  private static class TestSource<T> extends OffsetBasedSource<T> {
    private static boolean readerClosed;
    private final Coder<T> coder;
    private final T[] elems;
    private final int firstSplitIndex;

    private transient CountDownLatch subrangesCompleted;

    public TestSource(Coder<T> coder, T... elems) {
      this(coder, elems.length, elems);
    }

    public TestSource(Coder<T> coder, int firstSplitIndex, T... elems) {
      super(0L, elems.length, 1L);
      this.elems = elems;
      this.coder = coder;
      this.firstSplitIndex = firstSplitIndex;
      readerClosed = false;

      subrangesCompleted = new CountDownLatch(2);
    }

    @Override
    public List<? extends OffsetBasedSource<T>> split(
        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
      return ImmutableList.of(this);
    }

    @Override
    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
      return elems.length;
    }

    @Override
    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
      subrangesCompleted = new CountDownLatch(2);
      return new TestReader<>(this, firstSplitIndex, subrangesCompleted);
    }

    @Override
    public long getMaxEndOffset(PipelineOptions options) throws Exception {
      return elems.length;
    }

    @Override
    public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
      subrangesCompleted.countDown();
      return new TestSource<>(coder, Arrays.copyOfRange(elems, (int) start, (int) end));
    }

    @Override
    public Coder<T> getOutputCoder() {
      return coder;
    }
  }

  private static class TestReader<T> extends OffsetBasedReader<T> {
    private final Source<T> initialSource;
    private final int sleepIndex;
    private final CountDownLatch dynamicallySplit;

    private int index;

    TestReader(OffsetBasedSource<T> source, int sleepIndex, CountDownLatch dynamicallySplit) {
      super(source);
      this.initialSource = source;
      this.sleepIndex = sleepIndex;
      this.dynamicallySplit = dynamicallySplit;
      this.index = -1;
    }

    @Override
    public TestSource<T> getCurrentSource() {
      return (TestSource<T>) super.getCurrentSource();
    }

    @Override
    protected long getCurrentOffset() throws NoSuchElementException {
      return (long) index;
    }

    @Override
    public boolean startImpl() throws IOException {
      return advanceImpl();
    }

    @Override
    public boolean advanceImpl() throws IOException {
      // Sleep before the sleep/split index is claimed so long as it will be claimed
      if (index + 1 == sleepIndex && sleepIndex < getCurrentSource().elems.length) {
        try {
          dynamicallySplit.await();
          while (initialSource.equals(getCurrentSource())) {
            // Spin until the current source is updated
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException(e);
        }
      }
      if (getCurrentSource().elems.length > index + 1) {
        index++;
        return true;
      }
      return false;
    }

    @Override
    public T getCurrent() throws NoSuchElementException {
      return getCurrentSource().elems[index];
    }

    @Override
    public void close() throws IOException {
      TestSource.readerClosed = true;
    }
  }

  private static WindowedValue<Long> gw(Long elem) {
    return WindowedValue.valueInGlobalWindow(elem);
  }
}
