/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.transforms;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
@Deprecated
public class DoFnTester<InputT, OutputT> implements AutoCloseable {

  private static final Logger LOG = LoggerFactory.getLogger(DoFnTester.class);

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @SuppressWarnings("unchecked")
  @Deprecated
  public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
    checkNotNull(fn, "fn can't be null");
    LOG.warn(
        "Your tests use DoFnTester, which may not exercise DoFns correctly. "
            + "Please use TestPipeline instead.");
    return new DoFnTester<>(fn);
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public void setSideInputs(Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) {
    checkState(
        state == State.UNINITIALIZED,
        "Can't add side inputs: DoFnTester is already initialized, in state %s",
        state);
    this.sideInputs = sideInputs;
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public <T> void setSideInput(PCollectionView<T> sideInput, BoundedWindow window, T value) {
    checkState(
        state == State.UNINITIALIZED,
        "Can't add side inputs: DoFnTester is already initialized, in state %s",
        state);
    Map<BoundedWindow, T> windowValues = (Map<BoundedWindow, T>) sideInputs.get(sideInput);
    if (windowValues == null) {
      windowValues = new HashMap<>();
      sideInputs.put(sideInput, windowValues);
    }
    windowValues.put(window, value);
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public PipelineOptions getPipelineOptions() {
    return options;
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public enum CloningBehavior {
    /**
     * Clone the {@link DoFn} and call {@link DoFn.Setup} every time a bundle starts; call {@link
     * DoFn.Teardown} every time a bundle finishes.
     */
    CLONE_PER_BUNDLE,
    /**
     * Clone the {@link DoFn} and call {@link DoFn.Setup} on the first access; call {@link
     * DoFn.Teardown} only explicitly.
     */
    CLONE_ONCE,
    /**
     * Do not clone the {@link DoFn}; call {@link DoFn.Setup} on the first access; call {@link
     * DoFn.Teardown} only explicitly.
     */
    DO_NOT_CLONE
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public void setCloningBehavior(CloningBehavior newValue) {
    checkState(state == State.UNINITIALIZED, "Wrong state: %s", state);
    this.cloningBehavior = newValue;
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public CloningBehavior getCloningBehavior() {
    return cloningBehavior;
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public List<OutputT> processBundle(Iterable<? extends InputT> inputElements) throws Exception {
    startBundle();
    for (InputT inputElement : inputElements) {
      processElement(inputElement);
    }
    finishBundle();
    return takeOutputElements();
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  @SafeVarargs
  public final List<OutputT> processBundle(InputT... inputElements) throws Exception {
    return processBundle(Arrays.asList(inputElements));
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public void startBundle() throws Exception {
    checkState(
        state == State.UNINITIALIZED || state == State.BUNDLE_FINISHED,
        "Wrong state during startBundle: %s",
        state);
    if (state == State.UNINITIALIZED) {
      initializeState();
    }
    try {
      fnInvoker.invokeStartBundle(new TestStartBundleContext());
    } catch (UserCodeException e) {
      unwrapUserCodeException(e);
    }
    state = State.BUNDLE_STARTED;
  }

  private static void unwrapUserCodeException(UserCodeException e) throws Exception {
    if (e.getCause() instanceof Exception) {
      throw (Exception) e.getCause();
    } else if (e.getCause() instanceof Error) {
      throw (Error) e.getCause();
    } else {
      throw e;
    }
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public void processElement(InputT element) throws Exception {
    processTimestampedElement(TimestampedValue.atMinimumTimestamp(element));
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public void processTimestampedElement(TimestampedValue<InputT> element) throws Exception {
    checkNotNull(element, "Timestamped element cannot be null");
    processWindowedElement(element.getValue(), element.getTimestamp(), GlobalWindow.INSTANCE);
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public void processWindowedElement(InputT element, Instant timestamp, final BoundedWindow window)
      throws Exception {
    if (state != State.BUNDLE_STARTED) {
      startBundle();
    }
    try {
      final DoFn<InputT, OutputT>.ProcessContext processContext =
          createProcessContext(
              ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING));
      fnInvoker.invokeProcessElement(
          new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
            @Override
            public BoundedWindow window() {
              return window;
            }

            @Override
            public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
              return processContext.pane();
            }

            @Override
            public PipelineOptions pipelineOptions() {
              return getPipelineOptions();
            }

            @Override
            public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
                DoFn<InputT, OutputT> doFn) {
              throw new UnsupportedOperationException(
                  "Not expected to access DoFn.StartBundleContext from @ProcessElement");
            }

            @Override
            public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
                DoFn<InputT, OutputT> doFn) {
              throw new UnsupportedOperationException(
                  "Not expected to access DoFn.FinishBundleContext from @ProcessElement");
            }

            @Override
            public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
              return processContext;
            }

            @Override
            public InputT element(DoFn<InputT, OutputT> doFn) {
              return processContext.element();
            }

            @Override
            public InputT schemaElement(int index) {
              throw new UnsupportedOperationException("Schemas are not supported by DoFnTester");
            }

            @Override
            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
              return processContext.timestamp();
            }

            @Override
            public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
              throw new UnsupportedOperationException(
                  "Not expected to access TimeDomain from @ProcessElement");
            }

            @Override
            public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
              return DoFnOutputReceivers.windowedReceiver(processContext, null);
            }

            @Override
            public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
              throw new UnsupportedOperationException("Schemas are not supported by DoFnTester");
            }

            @Override
            public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
              return DoFnOutputReceivers.windowedMultiReceiver(processContext, null);
            }

            @Override
            public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
              throw new UnsupportedOperationException("DoFnTester doesn't support timers yet.");
            }

            @Override
            public RestrictionTracker<?, ?> restrictionTracker() {
              throw new UnsupportedOperationException(
                  "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester");
            }

            @Override
            public org.apache.beam.sdk.state.State state(String stateId) {
              throw new UnsupportedOperationException("DoFnTester doesn't support state yet");
            }

            @Override
            public Timer timer(String timerId) {
              throw new UnsupportedOperationException("DoFnTester doesn't support timers yet");
            }
          });
    } catch (UserCodeException e) {
      unwrapUserCodeException(e);
    }
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public void finishBundle() throws Exception {
    checkState(
        state == State.BUNDLE_STARTED,
        "Must be inside bundle to call finishBundle, but was: %s",
        state);
    try {
      fnInvoker.invokeFinishBundle(new TestFinishBundleContext());
    } catch (UserCodeException e) {
      unwrapUserCodeException(e);
    }
    if (cloningBehavior == CloningBehavior.CLONE_PER_BUNDLE) {
      fnInvoker.invokeTeardown();
      fn = null;
      fnInvoker = null;
      state = State.UNINITIALIZED;
    } else {
      state = State.BUNDLE_FINISHED;
    }
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public List<OutputT> peekOutputElements() {
    return peekOutputElementsWithTimestamp().stream()
        .map(TimestampedValue::getValue)
        .collect(Collectors.toList());
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() {
    // TODO: Should we return an unmodifiable list?
    return getImmutableOutput(mainOutputTag).stream()
        .map(input -> TimestampedValue.of(input.getValue(), input.getTimestamp()))
        .collect(Collectors.toList());
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public List<TimestampedValue<OutputT>> peekOutputElementsInWindow(BoundedWindow window) {
    return peekOutputElementsInWindow(mainOutputTag, window);
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public List<TimestampedValue<OutputT>> peekOutputElementsInWindow(
      TupleTag<OutputT> tag, BoundedWindow window) {
    ImmutableList.Builder<TimestampedValue<OutputT>> valuesBuilder = ImmutableList.builder();
    for (ValueInSingleWindow<OutputT> value : getImmutableOutput(tag)) {
      if (value.getWindow().equals(window)) {
        valuesBuilder.add(TimestampedValue.of(value.getValue(), value.getTimestamp()));
      }
    }
    return valuesBuilder.build();
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public void clearOutputElements() {
    getMutableOutput(mainOutputTag).clear();
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public List<OutputT> takeOutputElements() {
    List<OutputT> resultElems = new ArrayList<>(peekOutputElements());
    clearOutputElements();
    return resultElems;
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public List<TimestampedValue<OutputT>> takeOutputElementsWithTimestamp() {
    List<TimestampedValue<OutputT>> resultElems =
        new ArrayList<>(peekOutputElementsWithTimestamp());
    clearOutputElements();
    return resultElems;
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public <T> List<T> peekOutputElements(TupleTag<T> tag) {
    // TODO: Should we return an unmodifiable list?
    return getImmutableOutput(tag).stream()
        .map(ValueInSingleWindow::getValue)
        .collect(Collectors.toList());
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public <T> void clearOutputElements(TupleTag<T> tag) {
    getMutableOutput(tag).clear();
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public <T> List<T> takeOutputElements(TupleTag<T> tag) {
    List<T> resultElems = new ArrayList<>(peekOutputElements(tag));
    clearOutputElements(tag);
    return resultElems;
  }

  private <T> List<ValueInSingleWindow<T>> getImmutableOutput(TupleTag<T> tag) {
    @SuppressWarnings({"unchecked", "rawtypes"})
    List<ValueInSingleWindow<T>> elems = (List) getOutputs().get(tag);
    return ImmutableList.copyOf(MoreObjects.firstNonNull(elems, Collections.emptyList()));
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  @SuppressWarnings({"unchecked", "rawtypes"})
  public <T> List<ValueInSingleWindow<T>> getMutableOutput(TupleTag<T> tag) {
    List<ValueInSingleWindow<T>> outputList = (List) getOutputs().get(tag);
    if (outputList == null) {
      outputList = new ArrayList<>();
      getOutputs().put(tag, (List) outputList);
    }
    return outputList;
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public TupleTag<OutputT> getMainOutputTag() {
    return mainOutputTag;
  }

  private class TestStartBundleContext extends DoFn<InputT, OutputT>.StartBundleContext {

    private TestStartBundleContext() {
      fn.super();
    }

    @Override
    public PipelineOptions getPipelineOptions() {
      return options;
    }
  }

  private class TestFinishBundleContext extends DoFn<InputT, OutputT>.FinishBundleContext {

    private TestFinishBundleContext() {
      fn.super();
    }

    @Override
    public PipelineOptions getPipelineOptions() {
      return options;
    }

    @Override
    public void output(OutputT output, Instant timestamp, BoundedWindow window) {
      output(mainOutputTag, output, timestamp, window);
    }

    @Override
    public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
      getMutableOutput(tag)
          .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING));
    }
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  public DoFn<InputT, OutputT>.ProcessContext createProcessContext(
      ValueInSingleWindow<InputT> element) {
    return new TestProcessContext(element);
  }

  private class TestProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
    private final ValueInSingleWindow<InputT> element;

    private TestProcessContext(ValueInSingleWindow<InputT> element) {
      fn.super();
      this.element = element;
    }

    @Override
    public InputT element() {
      return element.getValue();
    }

    @Override
    public <T> T sideInput(PCollectionView<T> view) {
      Map<BoundedWindow, ?> viewValues = sideInputs.get(view);
      if (viewValues != null) {
        BoundedWindow sideInputWindow =
            view.getWindowMappingFn().getSideInputWindow(element.getWindow());
        @SuppressWarnings("unchecked")
        T windowValue = (T) viewValues.get(sideInputWindow);
        if (windowValue != null) {
          return windowValue;
        }
      }
      // Fallback to returning the default materialization if no data was supplied.
      // This is really to support singleton views with default values.

      // TODO: Update this to supply a materialization dependent on actual URN of materialization.
      // Currently the SDK only supports the multimap materialization and it expects a
      // mapping function.
      checkState(
          Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
              view.getViewFn().getMaterialization().getUrn()),
          "Only materializations of type %s supported, received %s",
          Materializations.MULTIMAP_MATERIALIZATION_URN,
          view.getViewFn().getMaterialization().getUrn());
      return ((ViewFn<Materializations.MultimapView, T>) view.getViewFn())
          .apply(o -> Collections.emptyList());
    }

    @Override
    public Instant timestamp() {
      return element.getTimestamp();
    }

    @Override
    public PaneInfo pane() {
      return element.getPane();
    }

    @Override
    public void updateWatermark(Instant watermark) {
      throw new UnsupportedOperationException();
    }

    @Override
    public PipelineOptions getPipelineOptions() {
      return options;
    }

    @Override
    public void output(OutputT output) {
      output(mainOutputTag, output);
    }

    @Override
    public void outputWithTimestamp(OutputT output, Instant timestamp) {
      outputWithTimestamp(mainOutputTag, output, timestamp);
    }

    @Override
    public <T> void output(TupleTag<T> tag, T output) {
      outputWithTimestamp(tag, output, element.getTimestamp());
    }

    @Override
    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
      getMutableOutput(tag)
          .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
    }
  }

  /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
  @Deprecated
  @Override
  public void close() throws Exception {
    if (state == State.BUNDLE_STARTED) {
      finishBundle();
    }
    if (state == State.BUNDLE_FINISHED) {
      fnInvoker.invokeTeardown();
      fn = null;
      fnInvoker = null;
    }
    state = State.TORN_DOWN;
  }

  /////////////////////////////////////////////////////////////////////////////

  /** The possible states of processing a {@link DoFn}. */
  private enum State {
    UNINITIALIZED,
    BUNDLE_STARTED,
    BUNDLE_FINISHED,
    TORN_DOWN
  }

  private final PipelineOptions options = PipelineOptionsFactory.create();

  /** The original {@link DoFn} under test. */
  private final DoFn<InputT, OutputT> origFn;

  /**
   * Whether to clone the original {@link DoFn} or just use it as-is.
   *
   * <p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be.
   */
  private CloningBehavior cloningBehavior = CloningBehavior.CLONE_ONCE;

  /** The side input values to provide to the {@link DoFn} under test. */
  private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs = new HashMap<>();

  /** The output tags used by the {@link DoFn} under test. */
  private TupleTag<OutputT> mainOutputTag = new TupleTag<>();

  /** The original DoFn under test, if started. */
  @Nullable private DoFn<InputT, OutputT> fn;

  @Nullable private DoFnInvoker<InputT, OutputT> fnInvoker;

  /** The outputs from the {@link DoFn} under test. Access via {@link #getOutputs()}. */
  @CheckForNull private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;

  /** The state of processing of the {@link DoFn} under test. */
  private State state = State.UNINITIALIZED;

  private DoFnTester(DoFn<InputT, OutputT> origFn) {
    this.origFn = origFn;
    DoFnSignature signature = DoFnSignatures.signatureForDoFn(origFn);
    for (DoFnSignature.Parameter param : signature.processElement().extraParameters()) {
      param.match(
          new DoFnSignature.Parameter.Cases.WithDefault<Void>() {
            @Override
            @Nullable
            public Void dispatch(DoFnSignature.Parameter.ProcessContextParameter p) {
              // ProcessContext parameter is obviously supported.
              return null;
            }

            @Override
            @Nullable
            public Void dispatch(DoFnSignature.Parameter.WindowParameter p) {
              // We also support the BoundedWindow parameter.
              return null;
            }

            @Override
            @Nullable
            public Void dispatch(DoFnSignature.Parameter.ElementParameter p) {
              return null;
            }

            @Override
            @Nullable
            public Void dispatch(DoFnSignature.Parameter.TimestampParameter p) {
              return null;
            }

            @Override
            @Nullable
            public Void dispatch(DoFnSignature.Parameter.TimeDomainParameter p) {
              return null;
            }

            @Override
            @Nullable
            public Void dispatch(DoFnSignature.Parameter.OutputReceiverParameter p) {
              return null;
            }

            @Override
            @Nullable
            public Void dispatch(DoFnSignature.Parameter.TaggedOutputReceiverParameter p) {
              return null;
            }

            @Override
            @Nullable
            public Void dispatch(DoFnSignature.Parameter.PaneInfoParameter p) {
              return null;
            }

            @Override
            protected Void dispatchDefault(DoFnSignature.Parameter p) {
              throw new UnsupportedOperationException(
                  "Parameter " + p + " not supported by DoFnTester");
            }
          });
    }
  }

  @SuppressWarnings("unchecked")
  private void initializeState() throws Exception {
    checkState(state == State.UNINITIALIZED, "Already initialized");
    checkState(fn == null, "Uninitialized but fn != null");
    if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) {
      fn = origFn;
    } else {
      fn =
          (DoFn<InputT, OutputT>)
              SerializableUtils.deserializeFromByteArray(
                  SerializableUtils.serializeToByteArray(origFn), origFn.toString());
    }
    fnInvoker = DoFnInvokers.invokerFor(fn);
    fnInvoker.invokeSetup();
  }

  private Map getOutputs() {
    if (outputs == null) {
      outputs = new HashMap<>();
    }
    return outputs;
  }
}
