/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.nemo.compiler.frontend.beam.transform;

import org.apache.beam.runners.core.*;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * This is a base class for Beam DoFn Transforms.
 *
 * @param <InputT>  input type.
 * @param <InterT>  intermediate type.
 * @param <OutputT> output type.
 */
public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
  Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
  private static final Logger LOG = LoggerFactory.getLogger(AbstractDoFnTransform.class.getName());

  private final TupleTag<OutputT> mainOutputTag;
  private final List<TupleTag<?>> additionalOutputTags;
  private final Map<Integer, PCollectionView<?>> sideInputs;
  private final WindowingStrategy<?, ?> windowingStrategy;
  private final DoFn<InterT, OutputT> doFn;
  private final SerializablePipelineOptions serializedOptions;
  private final Coder<InputT> inputCoder;
  private final Map<TupleTag<?>, Coder<?>> outputCoders;

  private transient OutputCollector<WindowedValue<OutputT>> outputCollector;
  private transient DoFnRunner<InterT, OutputT> doFnRunner;

  // null when there is no side input.
  private transient PushbackSideInputDoFnRunner<InterT, OutputT> pushBackRunner;

  private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
  private transient DoFnRunners.OutputManager outputManager;
  private transient InMemorySideInputReader sideInputReader;

  // Variables for bundle.
  // We consider count and time millis for start/finish bundle.
  // If # of processed elements > bundleSize
  // or elapsed time > bundleMillis, we finish the current bundle and start a new one
  private transient long bundleSize;
  private transient long bundleMillis;
  private long prevBundleStartTime;
  private long currBundleCount = 0;
  private boolean bundleFinished = true;
  private final DisplayData displayData;

  /**
   * AbstractDoFnTransform constructor.
   *
   * @param doFn                 doFn
   * @param inputCoder           input coder
   * @param outputCoders         output coders
   * @param mainOutputTag        main output tag
   * @param additionalOutputTags additional output tags
   * @param windowingStrategy    windowing strategy
   * @param sideInputs           side inputs
   * @param options              pipeline options
   * @param displayData          display data.
   */
  public AbstractDoFnTransform(final DoFn<InterT, OutputT> doFn,
                               final Coder<InputT> inputCoder,
                               final Map<TupleTag<?>, Coder<?>> outputCoders,
                               final TupleTag<OutputT> mainOutputTag,
                               final List<TupleTag<?>> additionalOutputTags,
                               final WindowingStrategy<?, ?> windowingStrategy,
                               final Map<Integer, PCollectionView<?>> sideInputs,
                               final PipelineOptions options,
                               final DisplayData displayData) {
    this.doFn = doFn;
    this.inputCoder = inputCoder;
    this.outputCoders = outputCoders;
    this.mainOutputTag = mainOutputTag;
    this.additionalOutputTags = additionalOutputTags;
    this.sideInputs = sideInputs;
    this.serializedOptions = new SerializablePipelineOptions(options);
    this.windowingStrategy = windowingStrategy;
    this.displayData = displayData;
  }

  /**
   * Getter function for side inputs.
   *
   * @return the side inputs
   */
  final Map<Integer, PCollectionView<?>> getSideInputs() {
    return sideInputs;
  }

  /**
   * Getter function for output manager.
   *
   * @return the output manager.
   */
  final DoFnRunners.OutputManager getOutputManager() {
    return outputManager;
  }

  /**
   * Getter function for windowing strategy.
   *
   * @return the windowing strategy.
   */
  final WindowingStrategy getWindowingStrategy() {
    return windowingStrategy;
  }

  /**
   * Getter function for output tag.
   *
   * @return main output tag.
   */
  final TupleTag<OutputT> getMainOutputTag() {
    return mainOutputTag;
  }

  /**
   * Getter function for DoFn runner.
   *
   * @return DoFn runner.
   */
  final DoFnRunner<InterT, OutputT> getDoFnRunner() {
    return doFnRunner;
  }

  /**
   * Getter function for push back runner.
   *
   * @return push back runner.
   */
  final PushbackSideInputDoFnRunner<InterT, OutputT> getPushBackRunner() {
    return pushBackRunner;
  }

  /**
   * Getter function for side input reader.
   *
   * @return side input reader.
   */
  final InMemorySideInputReader getSideInputReader() {
    return sideInputReader;
  }

  /**
   * Getter function for DoFn.
   *
   * @return DoFn.
   */
  public final DoFn getDoFn() {
    return doFn;
  }

  /**
   * Checks whether the bundle is finished or not.
   * Starts the bundle if it is done.
   * <p>
   * TODO #263: Partial Combining for Beam Streaming
   * We may want to use separate methods for doFnRunner/pushBackRunner
   * (same applies to the other bundle-related methods)
   */
  final void checkAndInvokeBundle() {
    if (bundleFinished) {
      bundleFinished = false;
      if (pushBackRunner == null) {
        doFnRunner.startBundle();
      } else {
        pushBackRunner.startBundle();
      }
      prevBundleStartTime = System.currentTimeMillis();
      currBundleCount = 0;
    }
    currBundleCount += 1;
  }

  /**
   * Checks whether it is time to finish the bundle and finish it.
   */
  final void checkAndFinishBundle() {
    if (!bundleFinished) {
      if (currBundleCount >= bundleSize || System.currentTimeMillis() - prevBundleStartTime >= bundleMillis) {
        bundleFinished = true;
        if (pushBackRunner == null) {
          doFnRunner.finishBundle();
        } else {
          pushBackRunner.finishBundle();
        }
      }
    }
  }

  /**
   * Finish bundle without checking for conditions.
   */
  final void forceFinishBundle() {
    if (!bundleFinished) {
      bundleFinished = true;
      if (pushBackRunner == null) {
        doFnRunner.finishBundle();
      } else {
        pushBackRunner.finishBundle();
      }
    }
  }

  @Override
  public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
    // deserialize pipeline option
    final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
    this.outputCollector = wrapOutputCollector(oc);

    this.bundleMillis = options.getMaxBundleTimeMills();
    this.bundleSize = options.getMaxBundleSize();

    // create output manager
    outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);

    // create side input reader
    sideInputReader = new InMemorySideInputReader(new ArrayList<>(sideInputs.values()));

    // this transform does not support state and timer.
    final StepContext stepContext = new StepContext() {
      @Override
      public StateInternals stateInternals() {
        throw new UnsupportedOperationException("Not support stateInternals in DoFnTransform");
      }

      @Override
      public TimerInternals timerInternals() {
        throw new UnsupportedOperationException("Not support timerInternals in DoFnTransform");
      }
    };

    final DoFn wrappedDoFn = wrapDoFn(doFn);

    // invoker
    doFnInvoker = DoFnInvokers.invokerFor(wrappedDoFn);
    doFnInvoker.invokeSetup();

    // DoFnRunners.simpleRunner takes care of all the hard stuff of running the DoFn
    // and that this approach is the standard used by most of the Beam runners
    doFnRunner = DoFnRunners.simpleRunner(
      options,
      wrappedDoFn,
      sideInputReader,
      outputManager,
      mainOutputTag,
      additionalOutputTags,
      stepContext,
      inputCoder,
      outputCoders,
      windowingStrategy);

    pushBackRunner = sideInputs.isEmpty()
      ? null
      : SimplePushbackSideInputDoFnRunner.<InterT, OutputT>create(doFnRunner, sideInputs.values(), sideInputReader);
  }

  /**
   * Getter function for output collector.
   *
   * @return output collector.
   */
  public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
    return outputCollector;
  }

  @Override
  public final void close() {
    beforeClose();
    forceFinishBundle();
    doFnInvoker.invokeTeardown();
  }

  @Override
  public final String toString() {
    return this.getClass().getSimpleName() + " / " + displayData.toString().replace(":", " / ");
  }

  /**
   * An abstract function that wraps the original doFn.
   *
   * @param originalDoFn the original doFn.
   * @return wrapped doFn.
   */
  abstract DoFn wrapDoFn(DoFn originalDoFn);

  /**
   * An abstract function that wraps the original output collector.
   *
   * @param oc the original outputCollector.
   * @return wrapped output collector.
   */
  abstract OutputCollector wrapOutputCollector(OutputCollector oc);

  /**
   * An abstract function that is called before close.
   */
  abstract void beforeClose();
}
