/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.TestStreamTranslation;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.TestStream.ElementEvent;
import org.apache.beam.sdk.testing.TestStream.Event;
import org.apache.beam.sdk.testing.TestStream.EventType;
import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;

/** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */
class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
  private final EvaluationContext evaluationContext;

  TestStreamEvaluatorFactory(EvaluationContext evaluationContext) {
    this.evaluationContext = evaluationContext;
  }

  @Nullable
  @Override
  public <InputT> TransformEvaluator<InputT> forApplication(
      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
    return createEvaluator((AppliedPTransform) application);
  }

  @Override
  public void cleanup() throws Exception {}

  /**
   * Returns the evaluator for the provided application of {@link TestStream}, or null if it is
   * already in use.
   *
   * <p>The documented behavior of {@link TestStream} requires the output of one event to travel
   * completely through the pipeline before any additional event, so additional instances that have
   * a separate collection of events cannot be created.
   */
  private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
      AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application) {
    return (TransformEvaluator<InputT>) new Evaluator<>(application, evaluationContext);
  }

  private static class Evaluator<T> implements TransformEvaluator<TestStreamIndex<T>> {
    private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
    private final EvaluationContext context;
    private final StepTransformResult.Builder resultBuilder;

    private Evaluator(
        AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application,
        EvaluationContext context) {
      this.application = application;
      this.context = context;
      this.resultBuilder = StepTransformResult.withoutHold(application);
    }

    @Override
    public void processElement(WindowedValue<TestStreamIndex<T>> element) throws Exception {
      TestStreamIndex<T> streamIndex = element.getValue();
      List<Event<T>> events = streamIndex.getTestStream().getEvents();
      int index = streamIndex.getIndex();
      Instant watermark = element.getTimestamp();
      Event<T> event = events.get(index);

      if (event.getType().equals(EventType.ELEMENT)) {
        UncommittedBundle<T> bundle =
            context.createBundle(
                (PCollection<T>) Iterables.getOnlyElement(application.getOutputs().values()));
        for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
          bundle.add(
              WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
        }
        resultBuilder.addOutput(bundle);
      }

      if (event.getType().equals(EventType.WATERMARK)) {
        watermark = ((WatermarkEvent<T>) event).getWatermark();
      }

      if (event.getType().equals(EventType.PROCESSING_TIME)) {
        ((TestClock) context.getClock())
            .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
      }

      TestStreamIndex<T> next = streamIndex.next();
      if (next.getIndex() < events.size()) {
        resultBuilder.addUnprocessedElements(
            Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(next, watermark)));
      }
    }

    @Override
    public TransformResult<TestStreamIndex<T>> finishBundle() throws Exception {
      return resultBuilder.build();
    }
  }

  @VisibleForTesting
  static class TestClock implements Clock {
    private final AtomicReference<Instant> currentTime =
        new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);

    public void advance(Duration amount) {
      Instant now = currentTime.get();
      currentTime.compareAndSet(now, now.plus(amount));
    }

    @Override
    public Instant now() {
      return currentTime.get();
    }
  }

  private static class TestClockSupplier implements Supplier<Clock> {
    @Override
    public Clock get() {
      return new TestClock();
    }
  }

  static class DirectTestStreamFactory<T>
      implements PTransformOverrideFactory<
          PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
    private final DirectRunner runner;

    DirectTestStreamFactory(DirectRunner runner) {
      this.runner = runner;
    }

    @Override
    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform) {
      try {
        return PTransformReplacement.of(
            transform.getPipeline().begin(),
            new DirectTestStream<>(runner, TestStreamTranslation.getTestStream(transform)));
      } catch (IOException exc) {
        throw new RuntimeException(
            String.format(
                "Transform could not be converted to %s", TestStream.class.getSimpleName()),
            exc);
      }
    }

    @Override
    public Map<PValue, ReplacementOutput> mapOutputs(
        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
      return ReplacementOutputs.singleton(outputs, newOutput);
    }

    static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1";

    static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
      private final transient DirectRunner runner;
      private final TestStream<T> original;

      @VisibleForTesting
      DirectTestStream(DirectRunner runner, TestStream<T> transform) {
        this.runner = runner;
        this.original = transform;
      }

      @Override
      public PCollection<T> expand(PBegin input) {
        runner.setClockSupplier(new TestClockSupplier());
        return PCollection.createPrimitiveOutputInternal(
            input.getPipeline(),
            WindowingStrategy.globalDefault(),
            IsBounded.UNBOUNDED,
            original.getValueCoder());
      }
    }
  }

  static class InputProvider<T> implements RootInputProvider<T, TestStreamIndex<T>, PBegin> {
    private final EvaluationContext evaluationContext;

    InputProvider(EvaluationContext evaluationContext) {
      this.evaluationContext = evaluationContext;
    }

    @Override
    public Collection<CommittedBundle<TestStreamIndex<T>>> getInitialInputs(
        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform,
        int targetParallelism) {

      // This will always be run on an execution-time transform, so it can be downcast
      DirectTestStreamFactory.DirectTestStream<T> testStream =
          (DirectTestStreamFactory.DirectTestStream<T>) transform.getTransform();

      CommittedBundle<TestStreamIndex<T>> initialBundle =
          evaluationContext
              .<TestStreamIndex<T>>createRootBundle()
              .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(testStream.original)))
              .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
      return Collections.singleton(initialBundle);
    }
  }

  @AutoValue
  abstract static class TestStreamIndex<T> {
    static <T> TestStreamIndex<T> of(TestStream<T> stream) {
      return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex<>(stream, 0);
    }

    abstract TestStream<T> getTestStream();

    abstract int getIndex();

    TestStreamIndex<T> next() {
      return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex<>(
          getTestStream(), getIndex() + 1);
    }
  }
}
