/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.core.construction.graph;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A Fuser that constructs a fused pipeline by fusing as many PCollections into a stage as possible.
 */
class GreedyPCollectionFusers {
  private static final Logger LOG = LoggerFactory.getLogger(GreedyPCollectionFusers.class);

  private static final Map<String, FusibilityChecker> URN_FUSIBILITY_CHECKERS =
      ImmutableMap.<String, FusibilityChecker>builder()
          .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, GreedyPCollectionFusers::canFuseParDo)
          .put(
              PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
              GreedyPCollectionFusers::canFuseParDo)
          .put(
              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
              GreedyPCollectionFusers::canFuseParDo)
          .put(
              PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
              GreedyPCollectionFusers::canFuseParDo)
          .put(
              PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
              GreedyPCollectionFusers::canFuseCompatibleEnvironment)
          .put(
              PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
              GreedyPCollectionFusers::canFuseCompatibleEnvironment)
          .put(
              PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
              GreedyPCollectionFusers::canFuseCompatibleEnvironment)
          .put(
              PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
              GreedyPCollectionFusers::canFuseCompatibleEnvironment)
          .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, GreedyPCollectionFusers::canAlwaysFuse)
          .put(
              // GroupByKeys are runner-implemented only. PCollections consumed by a GroupByKey must
              // be materialized
              PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, GreedyPCollectionFusers::cannotFuse)
          .put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, GreedyPCollectionFusers::cannotFuse)
          .build();
  private static final FusibilityChecker DEFAULT_FUSIBILITY_CHECKER =
      GreedyPCollectionFusers::unknownTransformFusion;

  // TODO: Migrate
  private static final Map<String, CompatibilityChecker> URN_COMPATIBILITY_CHECKERS =
      ImmutableMap.<String, CompatibilityChecker>builder()
          .put(
              PTransformTranslation.PAR_DO_TRANSFORM_URN,
              GreedyPCollectionFusers::parDoCompatibility)
          .put(
              PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
              GreedyPCollectionFusers::parDoCompatibility)
          .put(
              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
              GreedyPCollectionFusers::parDoCompatibility)
          .put(
              PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
              GreedyPCollectionFusers::parDoCompatibility)
          .put(
              PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
              GreedyPCollectionFusers::compatibleEnvironments)
          .put(
              PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
              GreedyPCollectionFusers::compatibleEnvironments)
          .put(
              PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
              GreedyPCollectionFusers::compatibleEnvironments)
          .put(
              PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
              GreedyPCollectionFusers::compatibleEnvironments)
          .put(
              PTransformTranslation.FLATTEN_TRANSFORM_URN, GreedyPCollectionFusers::noCompatibility)
          .put(
              PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
              GreedyPCollectionFusers::noCompatibility)
          .put(
              PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
              GreedyPCollectionFusers::noCompatibility)
          .build();
  private static final CompatibilityChecker DEFAULT_COMPATIBILITY_CHECKER =
      GreedyPCollectionFusers::unknownTransformCompatibility;

  /** Returns true if the PTransform node for the given input PCollection can be fused across. */
  public static boolean canFuse(
      PTransformNode transformNode,
      Environment environment,
      PCollectionNode candidate,
      Collection<PCollectionNode> stagePCollections,
      QueryablePipeline pipeline) {
    return URN_FUSIBILITY_CHECKERS
        .getOrDefault(transformNode.getTransform().getSpec().getUrn(), DEFAULT_FUSIBILITY_CHECKER)
        .canFuse(transformNode, environment, candidate, stagePCollections, pipeline);
  }

  /**
   * Returns true if the two PTransforms are compatible such that they can be executed in the same
   * environment.
   */
  public static boolean isCompatible(
      PTransformNode left, PTransformNode right, QueryablePipeline pipeline) {
    CompatibilityChecker leftChecker =
        URN_COMPATIBILITY_CHECKERS.getOrDefault(
            left.getTransform().getSpec().getUrn(), DEFAULT_COMPATIBILITY_CHECKER);
    CompatibilityChecker rightChecker =
        URN_COMPATIBILITY_CHECKERS.getOrDefault(
            right.getTransform().getSpec().getUrn(), DEFAULT_COMPATIBILITY_CHECKER);
    // The nodes are mutually compatible
    return leftChecker.isCompatible(left, right, pipeline)
        && rightChecker.isCompatible(right, left, pipeline);
  }

  // For the following methods, these should be called if the ptransform is consumed by a
  // PCollection output by the ExecutableStage, to determine if it can be fused into that
  // Subgraph

  private interface FusibilityChecker {
    /**
     * Determine if a {@link PTransformNode} can be fused into an existing {@link ExecutableStage}.
     */
    boolean canFuse(
        PTransformNode transformNode,
        Environment environment,
        @SuppressWarnings("unused") PCollectionNode candidate,
        Collection<PCollectionNode> stagePCollections,
        QueryablePipeline pipeline);
  }

  private interface CompatibilityChecker {
    /**
     * Determine if two {@link PTransformNode PTransforms} can be fused into a new stage. This
     * determines sibling fusion for new {@link ExecutableStage stages}.
     */
    boolean isCompatible(
        PTransformNode newNode, PTransformNode otherNode, QueryablePipeline pipeline);
  }

  /**
   * A ParDo can be fused into a stage if it executes in the same Environment as that stage, and no
   * transform that are upstream of any of its side input are present in that stage.
   *
   * <p>A ParDo that consumes a side input cannot process an element until all of the side inputs
   * contain data for the side input window that contains the element.
   */
  private static boolean canFuseParDo(
      PTransformNode parDo,
      Environment environment,
      PCollectionNode candidate,
      Collection<PCollectionNode> stagePCollections,
      QueryablePipeline pipeline) {
    Optional<Environment> env = pipeline.getEnvironment(parDo);
    checkArgument(
        env.isPresent(),
        "A %s must have an %s associated with it",
        ParDoPayload.class.getSimpleName(),
        Environment.class.getSimpleName());
    if (!env.get().equals(environment)) {
      // The PCollection's producer and this ParDo execute in different environments, so fusion
      // is never possible.
      return false;
    }
    try {
      ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
      if (Maps.filterKeys(
              parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s))
          .values()
          .contains(candidate.getId())) {
        // Allow fusion across timer PCollections because they are a self loop.
        return true;
      } else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
        // Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for
        // a key must execute serially. To avoid checking if the rest of the stage is
        // key-partitioned and preserves keys, these ParDos do not fuse into an existing stage.
        return false;
      } else if (!pipeline.getSideInputs(parDo).isEmpty()) {
        // At execution time, a Runner is required to only provide inputs to a PTransform that, at
        // the time the PTransform processes them, the associated window is ready in all side inputs
        // that the PTransform consumes. For an arbitrary stage, it is significantly complex for the
        // runner to determine this for each input. As a result, we break fusion to simplify this
        // inspection. In general, a ParDo which consumes side inputs cannot be fused into an
        // executable stage alongside any transforms which are upstream of any of its side inputs.
        return false;
      }
    } catch (InvalidProtocolBufferException e) {
      throw new IllegalArgumentException(e);
    }
    return true;
  }

  private static boolean parDoCompatibility(
      PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) {
    // Implicitly true if we are attempting to fuse against oneself. This case comes up for
    // PCollections representing timers since they create a self-loop in the graph.
    return parDo.equals(other)
        // This is a convenience rather than a strict requirement. In general, a ParDo that consumes
        // side inputs can be fused with other transforms in the same environment which are not
        // upstream of any of the side inputs.
        || (pipeline.getSideInputs(parDo).isEmpty()
            // We purposefully break fusion here to provide runners the opportunity to insert a
            // grouping operation to simplify implementing support for ParDo's that contain user
            // state.
            // We would not need to do this if we had the ability to mark upstream transforms as
            // key preserving or if runners could execute ParDos containing user state in a
            // distributed
            // fashion for a single key.
            && pipeline.getUserStates(parDo).isEmpty()
            // We purposefully break fusion here to provide runners the opportunity to insert a
            // grouping operation to simplify implementing support for ParDo's that contain timers.
            // We would not need to do this if we had the ability to mark upstream transforms as
            // key preserving or if runners could execute ParDos containing timers in a distributed
            // fashion for a single key.
            && pipeline.getTimers(parDo).isEmpty()
            && compatibleEnvironments(parDo, other, pipeline));
  }

  /**
   * A WindowInto can be fused into a stage if it executes in the same Environment as that stage.
   */
  private static boolean canFuseCompatibleEnvironment(
      PTransformNode operation,
      Environment environmemnt,
      @SuppressWarnings("unused") PCollectionNode candidate,
      @SuppressWarnings("unused") Collection<PCollectionNode> stagePCollections,
      QueryablePipeline pipeline) {
    // WindowInto transforms may not have an environment
    Optional<Environment> operationEnvironment = pipeline.getEnvironment(operation);
    return environmemnt.equals(operationEnvironment.orElse(null));
  }

  private static boolean compatibleEnvironments(
      PTransformNode left, PTransformNode right, QueryablePipeline pipeline) {
    return pipeline.getEnvironment(left).equals(pipeline.getEnvironment(right));
  }

  /**
   * Flatten can be fused into any stage.
   *
   * <p>If the assumption that for each {@link PCollection}, an element is produced in that {@link
   * PCollection} via a single path through the {@link Pipeline} DAG, a Flatten can appear in each
   * stage that produces any of its input {@link PCollection PCollections}, as all of its inputs
   * will reach it via only one of those paths.
   *
   * <p>As flatten consumes multiple inputs and produces a single output, there are two cases that
   * must be considered for the inputs.
   *
   * <ul>
   *   <li>All of the producers of all inputs are within a single stage
   *   <li>The producers of the inputs are in two or more stages
   * </ul>
   *
   * <p>If all of the producers exist within a single stage, this is identical to any other
   * transform that consumes a single input - the output PCollection is materialized based only on
   * its consumers.
   *
   * <p>If the producers exist within separate stages, there are two other considerations:
   *
   * <ul>
   *   <li>The output PCollection must be materialized in all cases (has consumers which cannot be
   *       fused into at least one of the upstream stages).
   *   <li>All of the consumers of the output PCollection can be fused into at least one of the
   *       producing stages.
   * </ul>
   *
   * <p>For the former case, this again is identical to a transform that produces a materialized
   * {@link PCollection}; each path to the {@link Flatten} produces elements for the input {@link
   * PCollection}, and the output is materialized and consumed by downstream transforms identically
   * to any other materialized {@link PCollection}.
   *
   * <p>For the latter, where fusion is possible into at least one of the producer stages, Flatten
   * unzipping is performed. This consists of the following steps:
   *
   * <ol>
   *   <li>The flatten is fused into each upstream stage
   *   <li>Each stage which contains a producer that can be fused with the output {@link
   *       PCollection} fuses that {@link PCollection}. Elements produced by that stage for the
   *       output of the flatten are never materialized.
   *   <li>Each stage which cannot be fused with the output {@link PCollection} materializes the
   *       output of the {@link Flatten}. All of the downstream consumers exist in a stage which
   *       reads from the output of that {@link Flatten}, which contains all of the elements from
   *       the stages that could not fuse with those consumers.
   * </ol>
   */
  private static boolean canAlwaysFuse(
      @SuppressWarnings("unused") PTransformNode flatten,
      @SuppressWarnings("unused") Environment environment,
      @SuppressWarnings("unused") PCollectionNode candidate,
      @SuppressWarnings("unused") Collection<PCollectionNode> stagePCollections,
      @SuppressWarnings("unused") QueryablePipeline pipeline) {
    return true;
  }

  private static boolean cannotFuse(
      @SuppressWarnings("unused") PTransformNode cannotFuse,
      @SuppressWarnings("unused") Environment environment,
      @SuppressWarnings("unused") PCollectionNode candidate,
      @SuppressWarnings("unused") Collection<PCollectionNode> stagePCollections,
      @SuppressWarnings("unused") QueryablePipeline pipeline) {
    return false;
  }

  private static boolean noCompatibility(
      @SuppressWarnings("unused") PTransformNode self,
      @SuppressWarnings("unused") PTransformNode other,
      @SuppressWarnings("unused") QueryablePipeline pipeline) {
    // TODO: There is performance to be gained if the output of a flatten is fused into a stage
    // where its output is wholly consumed after a fusion break. This requires slightly more
    // lookahead.
    return false;
  }

  // Things with unknown URNs either execute within their own stage or are executed by the runner.
  // In either case, assume the system is capable of executing the expressed transform
  private static boolean unknownTransformFusion(
      PTransformNode transform,
      @SuppressWarnings("unused") Environment environment,
      @SuppressWarnings("unused") PCollectionNode candidate,
      @SuppressWarnings("unused") Collection<PCollectionNode> stagePCollections,
      @SuppressWarnings("unused") QueryablePipeline pipeline) {
    LOG.debug(
        "Unknown {} {} will not fuse into an existing {}",
        PTransform.class.getSimpleName(),
        transform.getTransform(),
        ExecutableStage.class.getSimpleName(),
        PTransform.class.getSimpleName());
    return false;
  }

  // Things with unknown URNs either execute within their own stage or are executed by the runner.
  // In either case, assume the system is capable of executing the expressed transform
  private static boolean unknownTransformCompatibility(
      PTransformNode transform,
      @SuppressWarnings("unused") PTransformNode other,
      @SuppressWarnings("unused") QueryablePipeline pipeline) {
    LOG.debug(
        "Unknown {} {} will not root a {} with other {}",
        PTransform.class.getSimpleName(),
        transform.getTransform(),
        ExecutableStage.class.getSimpleName(),
        PTransform.class.getSimpleName());
    return false;
  }
}
