/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.direct.portable;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.local.ExecutionDriver;
import org.apache.beam.runners.local.PipelineMessageReceiver;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Pushes additional work onto a {@link BundleProcessor} based on the fact that a pipeline has
 * quiesced.
 */
class QuiescenceDriver implements ExecutionDriver {
  private static final Logger LOG = LoggerFactory.getLogger(QuiescenceDriver.class);

  public static ExecutionDriver create(
      EvaluationContext context,
      ExecutableGraph<PTransformNode, PCollectionNode> graph,
      BundleProcessor<PCollectionNode, CommittedBundle<?>, PTransformNode> bundleProcessor,
      PipelineMessageReceiver messageReceiver,
      Map<PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> initialBundles) {
    return new QuiescenceDriver(context, graph, bundleProcessor, messageReceiver, initialBundles);
  }

  private final EvaluationContext evaluationContext;
  private final ExecutableGraph<PTransformNode, PCollectionNode> graph;
  private final BundleProcessor<PCollectionNode, CommittedBundle<?>, PTransformNode>
      bundleProcessor;
  private final PipelineMessageReceiver pipelineMessageReceiver;

  private final CompletionCallback defaultCompletionCallback =
      new TimerIterableCompletionCallback(Collections.emptyList());

  private final Map<PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> pendingRootBundles;
  private final Queue<WorkUpdate> pendingWork = new ConcurrentLinkedQueue<>();

  private final AtomicReference<ExecutorState> state =
      new AtomicReference<>(ExecutorState.QUIESCENT);
  private final AtomicLong outstandingWork = new AtomicLong(0L);
  private boolean exceptionThrown = false;

  private QuiescenceDriver(
      EvaluationContext evaluationContext,
      ExecutableGraph<PTransformNode, PCollectionNode> graph,
      BundleProcessor<PCollectionNode, CommittedBundle<?>, PTransformNode> bundleProcessor,
      PipelineMessageReceiver pipelineMessageReceiver,
      Map<PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> pendingRootBundles) {
    this.evaluationContext = evaluationContext;
    this.graph = graph;
    this.bundleProcessor = bundleProcessor;
    this.pipelineMessageReceiver = pipelineMessageReceiver;
    this.pendingRootBundles = pendingRootBundles;
  }

  @Override
  public DriverState drive() {
    boolean noWorkOutstanding = outstandingWork.get() == 0L;
    ExecutorState startingState = state.get();
    if (startingState == ExecutorState.ACTIVE) {
      // The remainder of this call will add all available work to the Executor, and there will
      // be no new work available
      state.compareAndSet(ExecutorState.ACTIVE, ExecutorState.PROCESSING);
    } else if (startingState == ExecutorState.PROCESSING && noWorkOutstanding) {
      // The executor has consumed all new work and no new work was added
      state.compareAndSet(ExecutorState.PROCESSING, ExecutorState.QUIESCING);
    } else if (startingState == ExecutorState.QUIESCING && noWorkOutstanding) {
      // The executor re-ran all blocked work and nothing could make progress.
      state.compareAndSet(ExecutorState.QUIESCING, ExecutorState.QUIESCENT);
    }
    fireTimers();
    Collection<WorkUpdate> updates = new ArrayList<>();
    // Pull all available updates off of the queue before adding additional work. This ensures
    // both loops terminate.
    WorkUpdate pendingUpdate = pendingWork.poll();
    while (pendingUpdate != null) {
      updates.add(pendingUpdate);
      pendingUpdate = pendingWork.poll();
    }
    for (WorkUpdate update : updates) {
      applyUpdate(noWorkOutstanding, startingState, update);
    }
    addWorkIfNecessary();

    if (exceptionThrown) {
      return DriverState.FAILED;
    } else if (evaluationContext.isDone()) {
      return DriverState.SHUTDOWN;
    } else {
      return DriverState.CONTINUE;
    }
  }

  private void applyUpdate(
      boolean noWorkOutstanding, ExecutorState startingState, WorkUpdate update) {
    LOG.debug("Executor Update: {}", update);
    if (update.getBundle().isPresent()) {
      if (ExecutorState.ACTIVE == startingState
          || (ExecutorState.PROCESSING == startingState && noWorkOutstanding)) {
        CommittedBundle<?> bundle = update.getBundle().get();
        for (PTransformNode consumer : update.getConsumers()) {
          outstandingWork.incrementAndGet();
          bundleProcessor.process(bundle, consumer, defaultCompletionCallback);
        }
      } else {
        pendingWork.offer(update);
      }
    } else if (update.getException().isPresent()) {
      pipelineMessageReceiver.failed(update.getException().get());
      exceptionThrown = true;
    }
  }

  /** Fires any available timers. */
  private void fireTimers() {
    try {
      for (FiredTimers<PTransformNode> transformTimers : evaluationContext.extractFiredTimers()) {
        Collection<TimerData> delivery = transformTimers.getTimers();
        KeyedWorkItem<?, Object> work =
            KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
        PCollectionNode inputPCollection =
            Iterables.getOnlyElement(graph.getPerElementInputs(transformTimers.getExecutable()));
        @SuppressWarnings({"unchecked", "rawtypes"})
        CommittedBundle<?> bundle =
            evaluationContext
                .createKeyedBundle(transformTimers.getKey(), inputPCollection)
                .add(WindowedValue.valueInGlobalWindow(work))
                .commit(evaluationContext.now());
        outstandingWork.incrementAndGet();
        bundleProcessor.process(
            bundle, transformTimers.getExecutable(), new TimerIterableCompletionCallback(delivery));
        state.set(ExecutorState.ACTIVE);
      }
    } catch (Exception e) {
      LOG.error("Internal Error while delivering timers", e);
      pipelineMessageReceiver.failed(e);
      exceptionThrown = true;
    }
  }

  /**
   * If all active {@link DirectTransformExecutor TransformExecutors} are in a blocked state, add
   * more work from root nodes that may have additional work. This ensures that if a pipeline has
   * elements available from the root nodes it will add those elements when necessary.
   */
  private void addWorkIfNecessary() {
    // If any timers have fired, they will add more work; We don't need to add more
    if (state.get() == ExecutorState.QUIESCENT) {
      // All current TransformExecutors are blocked; add more work from the roots.
      for (Map.Entry<PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> pendingRootEntry :
          pendingRootBundles.entrySet()) {
        Collection<CommittedBundle<?>> bundles = new ArrayList<>();
        // Pull all available work off of the queue, then schedule it all, so this loop
        // terminates
        while (!pendingRootEntry.getValue().isEmpty()) {
          CommittedBundle<?> bundle = pendingRootEntry.getValue().poll();
          bundles.add(bundle);
        }
        for (CommittedBundle<?> bundle : bundles) {
          outstandingWork.incrementAndGet();
          bundleProcessor.process(bundle, pendingRootEntry.getKey(), defaultCompletionCallback);
          state.set(ExecutorState.ACTIVE);
        }
      }
    }
  }

  /**
   * The state of the executor. The state of the executor determines the behavior of the {@link
   * QuiescenceDriver} when it runs.
   */
  private enum ExecutorState {
    /**
     * Output has been produced since the last time the monitor ran. Work exists that has not yet
     * been evaluated, and all pending, including potentially blocked work, should be evaluated.
     *
     * <p>The executor becomes active whenever a timer fires, a {@link PCollectionView} is updated,
     * or output is produced by the evaluation of a {@link DirectTransformExecutor}.
     */
    ACTIVE,
    /**
     * The Executor does not have any unevaluated work available to it, but work is in progress.
     * Work should not be added until the Executor becomes active or no work is outstanding.
     *
     * <p>If all outstanding work completes without the executor becoming {@code ACTIVE}, the
     * Executor enters state {@code QUIESCING}. Previously evaluated work must be reevaluated, in
     * case a side input has made progress.
     */
    PROCESSING,
    /**
     * All outstanding work is work that may be blocked on a side input. When there is no
     * outstanding work, the executor becomes {@code QUIESCENT}.
     */
    QUIESCING,
    /**
     * All elements are either buffered in state or are blocked on a side input. There are no timers
     * that are permitted to fire but have not. There is no outstanding work.
     *
     * <p>The pipeline will not make progress without the progression of watermarks, the progression
     * of processing time, or the addition of elements.
     */
    QUIESCENT
  }

  /**
   * The base implementation of {@link CompletionCallback} that provides implementations for {@link
   * #handleResult(CommittedBundle, TransformResult)} and {@link #handleException(CommittedBundle,
   * Exception)}.
   */
  private class TimerIterableCompletionCallback implements CompletionCallback {
    private final Iterable<TimerData> timers;

    TimerIterableCompletionCallback(Iterable<TimerData> timers) {
      this.timers = timers;
    }

    @Override
    public final CommittedResult handleResult(
        CommittedBundle<?> inputBundle, TransformResult<?> result) {
      CommittedResult<PTransformNode> committedResult =
          evaluationContext.handleResult(inputBundle, timers, result);
      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
        pendingWork.offer(
            WorkUpdate.fromBundle(
                outputBundle, graph.getPerElementConsumers(outputBundle.getPCollection())));
      }
      Optional<? extends CommittedBundle<?>> unprocessedInputs =
          committedResult.getUnprocessedInputs();
      if (unprocessedInputs.isPresent()) {
        if (inputBundle.getPCollection() == null) {
          // TODO: Split this logic out of an if statement
          pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs.get());
        } else {
          pendingWork.offer(
              WorkUpdate.fromBundle(
                  unprocessedInputs.get(), Collections.singleton(committedResult.getExecutable())));
        }
      }
      if (!committedResult.getProducedOutputTypes().isEmpty()) {
        state.set(ExecutorState.ACTIVE);
      }
      outstandingWork.decrementAndGet();
      return committedResult;
    }

    @Override
    public void handleEmpty(PTransformNode transform) {
      outstandingWork.decrementAndGet();
    }

    @Override
    public final void handleException(CommittedBundle<?> inputBundle, Exception e) {
      pendingWork.offer(WorkUpdate.fromException(e));
      outstandingWork.decrementAndGet();
    }

    @Override
    public void handleError(Error err) {
      outstandingWork.decrementAndGet();
      pipelineMessageReceiver.failed(err);
    }
  }

  /**
   * An internal status update on the state of the executor.
   *
   * <p>Used to signal when the executor should be shut down (due to an exception).
   */
  @AutoValue
  abstract static class WorkUpdate {
    private static WorkUpdate fromBundle(
        CommittedBundle<?> bundle, Collection<PTransformNode> consumers) {
      return new AutoValue_QuiescenceDriver_WorkUpdate(
          Optional.of(bundle), consumers, Optional.absent());
    }

    private static WorkUpdate fromException(Exception e) {
      return new AutoValue_QuiescenceDriver_WorkUpdate(
          Optional.absent(), Collections.emptyList(), Optional.of(e));
    }

    /** Returns the bundle that produced this update. */
    public abstract Optional<? extends CommittedBundle<?>> getBundle();

    /**
     * Returns the transforms to process the bundle. If nonempty, {@link #getBundle()} will return a
     * present {@link Optional}.
     */
    public abstract Collection<PTransformNode> getConsumers();

    public abstract Optional<? extends Exception> getException();
  }
}
