/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.core.optimizer.enumeration;

import org.apache.commons.lang3.Validate;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval;
import org.apache.wayang.core.optimizer.costs.TimeEstimate;
import org.apache.wayang.core.optimizer.costs.TimeToCostConverter;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.ElementaryOperator;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.LoopSubplan;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.OperatorAlternative;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.plan.wayangplan.Slot;
import org.apache.wayang.core.platform.Junction;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.util.Canonicalizer;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.core.util.Tuple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.ToDoubleFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * Represents a partial execution plan.
 */
public class PlanImplementation {

    private static final Logger logger = LogManager.getLogger(PlanImplementation.class);

    /**
     * {@link ExecutionOperator}s contained in this instance.
     */
    private final Canonicalizer<ExecutionOperator> operators;

    /**
     * Describes the {@link Channel}s that have been picked between {@link ExecutionOperator}s and how they are
     * implemented.
     */
    private final Map<OutputSlot<?>, Junction> junctions;

    /**
     * Defines how {@link LoopSubplan}s should be executed.
     */
    private final Map<LoopSubplan, LoopImplementation> loopImplementations = new HashMap<>();

    /**
     * An enumerated plan is mainly characterized by the {@link OperatorAlternative.Alternative}s that have
     * been picked so far. This member keeps track of them.
     */
    private final Map<OperatorAlternative, OperatorAlternative.Alternative> settledAlternatives =
            new HashMap<>();

    /**
     * The {@link PlanEnumeration} that hosts this instance. Can change over time.
     */
    // TODO: I think, we don't maintain this field properly. Also, its semantics blur inside of LoopImplementations. Can we remove it?
    private PlanEnumeration planEnumeration;

    /**
     * Keep track of the {@link Platform}s of our {@link #operators}.
     */
    private Set<Platform> platformCache;

    /**
     * {@link OptimizationContext} that provides estimates for the {@link #operators}.
     */
    private final OptimizationContext optimizationContext;

    /**
     * The squashed cost estimate to execute this instance. This will be used to select the best plan!
     */
    private double squashedCostEstimateCache = Double.NaN, squashedCostEstimateWithoutOverheadCache = Double.NaN;

    /**
     * The parallel cost estimate . This will store both calculated squashed cost and cost that will be used to select the best enumerated plan!
     */
    private Tuple<List<ProbabilisticDoubleInterval>, List<Double>> parallelCostEstimateCache = null;

    /**
     * This will be used to store the parallel cost of each operator.
     */
    private List<Tuple<Operator, Tuple<List<ProbabilisticDoubleInterval>, List<Double>>>> calculatedParallelOperatorCostCache = new ArrayList<>();

    /**
     * Create a new instance.
     */
    PlanImplementation(
            PlanEnumeration planEnumeration,
            Map<OutputSlot<?>, Junction> junctions,
            Collection<ExecutionOperator> operators,
            OptimizationContext optimizationContext) {
        this(planEnumeration, junctions, new Canonicalizer<>(operators), optimizationContext);
    }

    /**
     * Creates new instance.
     */
    PlanImplementation(PlanEnumeration planEnumeration,
                       Map<OutputSlot<?>, Junction> junctions,
                       OptimizationContext optimizationContext) {
        this(planEnumeration, junctions, new Canonicalizer<>(), optimizationContext);
    }

    /**
     * Copy constructor.
     */
    public PlanImplementation(PlanImplementation original) {
        this.planEnumeration = original.planEnumeration;
        this.junctions = new HashMap<>(original.junctions);
        this.operators = new Canonicalizer<>(original.getOperators());
        this.settledAlternatives.putAll(original.settledAlternatives);
        this.loopImplementations.putAll(original.loopImplementations);
        this.optimizationContext = original.optimizationContext;
    }

    /**
     * Base constructor.
     */
    private PlanImplementation(PlanEnumeration planEnumeration,
                               Map<OutputSlot<?>, Junction> junctions,
                               Canonicalizer<ExecutionOperator> operators,
                               OptimizationContext optimizationContext) {
        this.planEnumeration = planEnumeration;
        this.junctions = junctions;
        this.operators = operators;
        this.optimizationContext = optimizationContext;

        assert this.planEnumeration != null;
    }


    /**
     * @return the {@link PlanEnumeration} this instance belongs to
     */
    public PlanEnumeration getPlanEnumeration() {
        return this.planEnumeration;
    }

    public void setPlanEnumeration(PlanEnumeration planEnumeration) {
        this.planEnumeration = planEnumeration;
    }


    /**
     * Find the {@link InputSlot}s of already picked {@link ExecutionOperator}s that represent the given {@link InputSlot}.
     * <p>Note that we require that this instance either provides all or no {@link ExecutionOperator}s necessary to
     * implement the {@link InputSlot}.</p>
     *
     * @param someInput any {@link InputSlot} of the original {@link WayangPlan}
     * @return the representing {@link InputSlot}s or {@code null} if this instance has no {@link ExecutionOperator}
     * backing the given {@link InputSlot}
     */
    Collection<InputSlot<?>> findExecutionOperatorInputs(final InputSlot<?> someInput) {
        final Operator owner = someInput.getOwner();
        if (owner.isAlternative()) {
            final OperatorAlternative operatorAlternative = (OperatorAlternative) owner;
            final OperatorAlternative.Alternative alternative = this.settledAlternatives.get(operatorAlternative);
            if (alternative == null) return null;
            @SuppressWarnings("unchecked")
            final Collection<InputSlot<?>> innerInputs = (Collection<InputSlot<?>>) (Collection) alternative.followInput(someInput);
            boolean isWithNull = false;
            Collection<InputSlot<?>> result = null;
            for (InputSlot<?> innerInput : innerInputs) {
                final Collection<InputSlot<?>> resolvedInputs = this.findExecutionOperatorInputs(innerInput);
                if (isWithNull && resolvedInputs != null) {
                    throw new IllegalStateException(String.format("Disallowed that %s is required by two different alternatives.", someInput));
                }
                isWithNull |= resolvedInputs == null;
                if (result == null) {
                    result = resolvedInputs;
                } else {
                    assert resolvedInputs != null;
                    result.addAll(resolvedInputs);
                }
            }
            return result;

        } else if (owner.isLoopSubplan()) {
            final LoopSubplan loopSubplan = (LoopSubplan) owner;
            final LoopImplementation loopImplementation = this.getLoopImplementations().get(loopSubplan);
            if (loopImplementation == null) return null;

            // Enter the LoopSubplan.
            final Collection<InputSlot<?>> innerInputs = loopSubplan.followInputUnchecked(someInput);
            if (innerInputs.isEmpty()) return innerInputs;

            // Discern LoopHeadOperator InputSlots and loop body InputSlots.
            final List<LoopImplementation.IterationImplementation> iterationImpls = loopImplementation.getIterationImplementations();
            final Collection<InputSlot<?>> collector = new HashSet<>(innerInputs.size());
            for (InputSlot<?> innerInput : innerInputs) {
                if (innerInput.getOwner() == loopSubplan.getLoopHead()) {
                    final LoopImplementation.IterationImplementation initialIterationImpl = iterationImpls.get(0);
                    collector.addAll(
                            initialIterationImpl.getBodyImplementation().findExecutionOperatorInputs(innerInput)
                    );
                } else {
                    for (LoopImplementation.IterationImplementation iterationImpl : iterationImpls) {
                        collector.addAll(
                                iterationImpl.getBodyImplementation().findExecutionOperatorInputs(innerInput)
                        );
                    }
                }
            }
            return collector;

        } else {
            assert owner.isExecutionOperator();
            Collection<InputSlot<?>> result = new LinkedList<>();
            result.add(someInput);
            return result;
        }

    }

    /**
     * Find the {@link OutputSlot}s of already picked {@link ExecutionOperator}s that represent the given {@link OutputSlot}.
     *
     * @param someOutput any {@link OutputSlot} of the original {@link WayangPlan}
     * @return the representing {@link OutputSlot}s
     */
    Collection<OutputSlot<?>> findExecutionOperatorOutput(OutputSlot<?> someOutput) {
        return this.findExecutionOperatorOutputWithContext(someOutput).stream()
                .map(Tuple::getField0)
                .collect(Collectors.toList());
    }

    /**
     * Find the {@link OutputSlot}s of already picked {@link ExecutionOperator}s that represent the given {@link OutputSlot}.
     *
     * @param someOutput any {@link OutputSlot} of the original {@link WayangPlan}
     * @return the representing {@link OutputSlot}s together with their enclosing {@link PlanImplementation}
     */
    Collection<Tuple<OutputSlot<?>, PlanImplementation>> findExecutionOperatorOutputWithContext(
            OutputSlot<?> someOutput) {
        while (someOutput != null
                && someOutput.getOwner().isAlternative()) {
            final Operator owner = someOutput.getOwner();
            final OperatorAlternative operatorAlternative = (OperatorAlternative) owner;
            final OperatorAlternative.Alternative alternative = this.settledAlternatives.get(operatorAlternative);
            someOutput = alternative == null ? null : alternative.traceOutput(someOutput);
        }

        // If we did not find a terminal OutputSlot.
        if (someOutput == null) {
            return Collections.emptySet();
        }

        // Otherwise, discern LoopSubplans and ExecutionOperators.
        final Operator owner = someOutput.getOwner();
        if (owner.isLoopSubplan()) {
            final LoopSubplan loopSubplan = (LoopSubplan) owner;
            final LoopImplementation loopImplementation = this.getLoopImplementations().get(loopSubplan);
            if (loopImplementation == null) return Collections.emptyList();

            // Enter the LoopSubplan.
            final OutputSlot<?> innerOutput = loopSubplan.traceOutput(someOutput);
            if (innerOutput == null) return Collections.emptyList();
            assert innerOutput.getOwner().isLoopHead();

            // For all the iterations, return the potential OutputSlots.
            final List<LoopImplementation.IterationImplementation> iterationImpls =
                    loopImplementation.getIterationImplementations();
            final Set<Tuple<OutputSlot<?>, PlanImplementation>> collector = new HashSet<>(iterationImpls.size());
            for (LoopImplementation.IterationImplementation iterationImpl : iterationImpls) {
                final Collection<Tuple<OutputSlot<?>, PlanImplementation>> outputsWithContext =
                        iterationImpl.getBodyImplementation().findExecutionOperatorOutputWithContext(innerOutput);
                collector.addAll(outputsWithContext);
            }

            return collector;

        } else {
            assert owner.isExecutionOperator();
            return Collections.singleton(new Tuple<>(someOutput, this));
        }
    }


    /**
     * Creates a new instance that forms the concatenation of this instance with the {@code targetPlans} via the
     * {@code junction}.
     *
     * @param targetPlans              instances to connect to
     * @param junction                 connects this instance with the {@code targetPlans}
     * @param outputPlanImplementation nested instance of this instance that hosts the {@code junction}
     * @param concatenationEnumeration that will host the concatenated instance
     * @return the concatenated instance or {@code null} if the inputs are contradicting each other
     */
    PlanImplementation concatenate(List<PlanImplementation> targetPlans,
                                   Junction junction,
                                   PlanImplementation outputPlanImplementation,
                                   PlanEnumeration concatenationEnumeration) {

        final PlanImplementation concatenation = new PlanImplementation(
                concatenationEnumeration,
                new HashMap<>(this.junctions.size() + 1),
                new HashSet<>(this.settledAlternatives.size(), targetPlans.size() * 4), // ballpark figure
                this.optimizationContext
        );

        concatenation.operators.addAll(this.operators);
        concatenation.junctions.putAll(this.junctions);
        concatenation.settledAlternatives.putAll(this.settledAlternatives);

        // Find the appropriate PlanImplementation for the junction and copy the loop implementations.
        PlanImplementation junctionPlanImplementation;
        if (outputPlanImplementation == this) {
            // Special case: The junction resides inside the top-level PlanImplementation.
            concatenation.loopImplementations.putAll(this.loopImplementations);
            junctionPlanImplementation = concatenation;
        } else {
            // Exhaustively, yet focused, search for the PlanImplementation.
            junctionPlanImplementation = concatenation.copyLoopImplementations(
                    this,
                    outputPlanImplementation,
                    junction.getSourceOutput().getOwner().getLoopStack()
            );
        }
        junctionPlanImplementation.junctions.put(junction.getSourceOutput(), junction);

        for (PlanImplementation targetPlan : targetPlans) {
            // NB: Join semantics at this point weaved in.
            if (concatenation.isSettledAlternativesContradicting(targetPlan)) {
                return null;
            }
            concatenation.operators.addAll(targetPlan.operators);
            concatenation.loopImplementations.putAll(targetPlan.loopImplementations);
            concatenation.junctions.putAll(targetPlan.junctions);
            concatenation.settledAlternatives.putAll(targetPlan.settledAlternatives);
        }

        return concatenation;
    }

    /**
     * Find the a given nested {@link PlanImplementation} in a further {@link PlanImplementation} and copy it to
     * this instance.
     *
     * @param originalPlanImplementation the (top-level) {@link PlanImplementation} to copy from
     * @param targetPlanImplementation   the (nested) {@link PlanImplementation} that should be copied
     * @param loopStack                  of an {@link ExecutionOperator} inside of the {@code targetPlanImplementation}
     * @return the copied {@link PlanImplementation} inside of this instance
     */
    private PlanImplementation copyLoopImplementations(PlanImplementation originalPlanImplementation,
                                                       PlanImplementation targetPlanImplementation,
                                                       LinkedList<LoopSubplan> loopStack) {
        // Descend into the loopStack.
        assert !loopStack.isEmpty();
        final LoopSubplan visitedLoop = loopStack.pop();

        // Copy the LoopImplementations of the originalPlanImplementation.
        this.loopImplementations.putAll(originalPlanImplementation.getLoopImplementations());
        // This one will be altered, so make an instance copy.
        final LoopImplementation loopImplCopy =
                this.loopImplementations.compute(visitedLoop, (key, value) -> new LoopImplementation(value));

        // Find the original counterpart to loopImplCopy.
        final LoopImplementation originalLoopImpl = originalPlanImplementation.loopImplementations.get(visitedLoop);


        // Go over the iterations of the LoopImplementations in parallel to process their PlanImplementations.
        PlanImplementation targetPlanImplementationCopy = null;
        Iterator<LoopImplementation.IterationImplementation>
                originalIterator = originalLoopImpl.getIterationImplementations().iterator(),
                copyIterator = loopImplCopy.getIterationImplementations().iterator();
        while (originalIterator.hasNext()) {
            final LoopImplementation.IterationImplementation nextCopy = copyIterator.next();
            final LoopImplementation.IterationImplementation nextOriginal = originalIterator.next();
            // If we need to descend further, invoke a recursive call.
            if (!loopStack.isEmpty()) {
                targetPlanImplementationCopy = nextCopy.getBodyImplementation().copyLoopImplementations(
                        nextOriginal.getBodyImplementation(),
                        targetPlanImplementation,
                        loopStack);

                // Once, we have found the iteration that contains the targetPlanImplementation, we can stop.
                if (targetPlanImplementationCopy != null) break;
            } else {
                // If we cannot descend futher, we basically need to find the correct iteration only.
                if (nextOriginal.getBodyImplementation() == targetPlanImplementation) {
                    targetPlanImplementationCopy = nextCopy.getBodyImplementation();
                    break;
                }
            }
        }

        // Restore the loopStack.
        loopStack.push(visitedLoop);

        // Return the match.
        return targetPlanImplementationCopy;


    }

    private boolean isSettledAlternativesContradicting(PlanImplementation that) {
        for (Map.Entry<OperatorAlternative, OperatorAlternative.Alternative> entry : this.settledAlternatives.entrySet()) {
            final OperatorAlternative opAlt = entry.getKey();
            final OperatorAlternative.Alternative alternative = entry.getValue();
            final OperatorAlternative.Alternative thatAlternative = that.settledAlternatives.get(opAlt);
            if (thatAlternative != null && alternative != thatAlternative) {
                return true;
            }
        }
        for (Map.Entry<LoopSubplan, LoopImplementation> entry : this.loopImplementations.entrySet()) {
            final LoopSubplan loop = entry.getKey();
            final LoopImplementation thisLoopImplementation = entry.getValue();
            final LoopImplementation thatLoopImplementation = that.loopImplementations.get(loop);
            if (thatLoopImplementation == null) continue;
            if (thisLoopImplementation
                    .getSingleIterationImplementation()
                    .getBodyImplementation()
                    .isSettledAlternativesContradicting(
                            thatLoopImplementation
                                    .getIterationImplementations().get(0)
                                    .getBodyImplementation()
                    )) {
                return true;
            }
        }
        return false;
    }

    /**
     * Escapes the {@link OperatorAlternative} that contains this instance.
     *
     * @param alternative        contains this instance
     * @param newPlanEnumeration will host the new instance
     * @return
     */
    public PlanImplementation escape(OperatorAlternative.Alternative alternative, PlanEnumeration newPlanEnumeration) {
        final PlanImplementation escapedPlanImplementation = new PlanImplementation(
                newPlanEnumeration, this.junctions, this.operators, this.optimizationContext
        );
        escapedPlanImplementation.settledAlternatives.putAll(this.settledAlternatives);
        assert !escapedPlanImplementation.settledAlternatives.containsKey(alternative.getOperatorAlternative());
        escapedPlanImplementation.settledAlternatives.put(alternative.getOperatorAlternative(), alternative);
        escapedPlanImplementation.loopImplementations.putAll(this.getLoopImplementations());
        return escapedPlanImplementation;
    }

    public Canonicalizer<ExecutionOperator> getOperators() {
        return this.operators;
    }

    public Map<LoopSubplan, LoopImplementation> getLoopImplementations() {
        return this.loopImplementations;
    }

    /**
     * Adds a new {@link LoopImplementation} for a given {@link LoopSubplan}.
     *
     * @param loop               the {@link LoopSubplan}
     * @param loopImplementation the {@link LoopImplementation}
     */
    public void addLoopImplementation(LoopSubplan loop, LoopImplementation loopImplementation) {
        this.loopImplementations.put(loop, loopImplementation);
    }

    /**
     * @return those contained {@link ExecutionOperator}s that have a {@link Slot} that is yet to be connected
     * to a further {@link ExecutionOperator} in the further plan enumeration process
     */
    public Collection<ExecutionOperator> getInterfaceOperators() {
        Validate.notNull(this.getPlanEnumeration());
        final Set<OutputSlot> outputSlots = this.getPlanEnumeration().servingOutputSlots.stream()
                .map(Tuple::getField0)
                .distinct()
                .collect(Collectors.toSet());
        final Set<InputSlot<?>> inputSlots = this.getPlanEnumeration().requestedInputSlots;

        return this.operators.stream()
                .filter(operator ->
                        this.allOutermostInputSlots(operator).anyMatch(inputSlots::contains) ||
                                this.allOutermostOutputSlots(operator).anyMatch(outputSlots::contains))
                .collect(Collectors.toList());
    }

    private Stream<OutputSlot> allOutermostOutputSlots(Operator operator) {
        return Arrays.stream(operator.getAllOutputs())
                .flatMap(output -> operator.getOutermostOutputSlots(output).stream());
    }

    private Stream<InputSlot> allOutermostInputSlots(Operator operator) {
        return Arrays.stream(operator.getAllInputs())
                .map(operator::getOutermostInputSlot);
    }

    /**
     * Find the {@link ExecutionOperator} that do not depend on any other {@link ExecutionOperator} as input.
     *
     * @return the start {@link ElementaryOperator}s
     */
    public List<ExecutionOperator> getStartOperators() {
        return this.operators.stream()
                .filter(this::isStartOperator)
                .collect(Collectors.toList());
    }

    /**
     * Detects start {@link ExecutionOperator}s.
     * <p>A start {@link ExecutionOperator} has an {@link InputSlot} that is requested by the {@link #planEnumeration}.</p>
     */
    private boolean isStartOperator(ExecutionOperator executionOperator) {
        ForLoop:
        for (InputSlot<?> inputSlot : executionOperator.getOriginal().getAllInputs()) {
            while (inputSlot != null) {
                if (this.planEnumeration.requestedInputSlots.contains(inputSlot)) {
                    continue ForLoop;
                }
                inputSlot = inputSlot.getOwner().getOuterInputSlot(inputSlot);
            }
            return false;
        }
        return true;
    }

    /**
     * Find for a given {@link OperatorAlternative}, which {@link OperatorAlternative.Alternative} has been picked
     * by this instance
     *
     * @param operatorAlternative the {@link OperatorAlternative} in question
     * @return the {@link OperatorAlternative.Alternative} or {@code null} if none has been chosen in this instance
     */
    public OperatorAlternative.Alternative getChosenAlternative(OperatorAlternative operatorAlternative) {
        return this.settledAlternatives.get(operatorAlternative);
    }

    /**
     * Retrieves the {@link TimeEstimate} for this instance, including platform overhead.
     *
     * @return the {@link TimeEstimate}
     */
    public TimeEstimate getTimeEstimate() {
        return this.getTimeEstimate(true);
    }

    /**
     * Retrieves the {@link TimeEstimate} for this instance, including platform overhead.
     *
     * @param isIncludeOverhead whether to include any incurring global overhead
     * @return the {@link TimeEstimate}
     */
    public TimeEstimate getTimeEstimate(boolean isIncludeOverhead) {
        final TimeEstimate operatorTimeEstimate = this.operators.stream()
                .map(op -> this.optimizationContext.getOperatorContext(op).getTimeEstimate())
                .reduce(TimeEstimate.ZERO, TimeEstimate::plus);
        final TimeEstimate junctionTimeEstimate = this.optimizationContext.getDefaultOptimizationContexts().stream()
                .flatMap(optCtx -> this.junctions.values().stream().map(jct -> jct.getTimeEstimate(optCtx)))
                .reduce(TimeEstimate.ZERO, TimeEstimate::plus);
        final TimeEstimate loopTimeEstimate = this.loopImplementations.values().stream()
                .map(LoopImplementation::getTimeEstimate)
                .reduce(TimeEstimate.ZERO, TimeEstimate::plus);
        TimeEstimate timeEstimate = operatorTimeEstimate.plus(junctionTimeEstimate).plus(loopTimeEstimate);

        if (isIncludeOverhead) {
            final long platformInitializationTime = this.getUtilizedPlatforms().stream()
                    .map(platform -> this.optimizationContext.getConfiguration().getPlatformStartUpTimeProvider().provideFor(platform))
                    .reduce(0L, (a, b) -> a + b);
            timeEstimate = timeEstimate.plus(platformInitializationTime);
        }

        return timeEstimate;
    }

    /**
     * Retrieves the cost estimate for this instance including any overhead.
     *
     * @return the cost estimate
     */
    public ProbabilisticDoubleInterval getCostEstimate() {

        if (this.optimizationContext.getConfiguration().getBooleanProperty("wayang.core.optimizer.enumeration.parallel-tasks")) {
            return this.getParallelCostEstimate(true);
        } else {
            return this.getCostEstimate(true);
        }
    }

    /**
     * Retrieves the cost estimate for this instance.
     *
     * @param isIncludeOverhead whether to include global overhead in the {@link TimeEstimate} (to avoid repeating
     *                          overhead in nested instances)
     * @return the cost estimate
     */
    ProbabilisticDoubleInterval getCostEstimate(boolean isIncludeOverhead) {
        ProbabilisticDoubleInterval costEstimateWithoutOverheadCache, costEstimateCache;
        final ProbabilisticDoubleInterval operatorCosts = this.operators.stream()
                .map(op -> this.optimizationContext.getOperatorContext(op).getCostEstimate())
                .reduce(ProbabilisticDoubleInterval.zero, ProbabilisticDoubleInterval::plus);
        final ProbabilisticDoubleInterval junctionCosts = this.optimizationContext.getDefaultOptimizationContexts().stream()
                .flatMap(optCtx -> this.junctions.values().stream().map(jct -> jct.getCostEstimate(optCtx)))
                .reduce(ProbabilisticDoubleInterval.zero, ProbabilisticDoubleInterval::plus);
        final ProbabilisticDoubleInterval loopCosts = this.loopImplementations.values().stream()
                .map(LoopImplementation::getCostEstimate)
                .reduce(ProbabilisticDoubleInterval.zero, ProbabilisticDoubleInterval::plus);
        costEstimateWithoutOverheadCache = operatorCosts.plus(junctionCosts).plus(loopCosts);
        ProbabilisticDoubleInterval overheadCosts = this.getUtilizedPlatforms().stream()
                .map(platform -> {
                    Configuration configuraiton = this.optimizationContext.getConfiguration();
                    long startUpTime = configuraiton.getPlatformStartUpTimeProvider().provideFor(platform);
                    TimeToCostConverter timeToCostConverter = configuraiton.getTimeToCostConverterProvider().provideFor(platform);
                    return timeToCostConverter.convert(new TimeEstimate(startUpTime, startUpTime, 1d));
                })
                .reduce(ProbabilisticDoubleInterval.zero, ProbabilisticDoubleInterval::plus);
        costEstimateCache = costEstimateWithoutOverheadCache.plus(overheadCosts);
        return isIncludeOverhead ? costEstimateCache : costEstimateWithoutOverheadCache;
    }

    /**
     * Retrieves the cost estimate for this instance including any overhead.
     *
     * @return the cost estimate
     */
    public double getSquashedCostEstimate() {
        // Check if the parallel cost calculation is enabled in the configuration file
        if (this.optimizationContext.getConfiguration().getBooleanProperty("wayang.core.optimizer.enumeration.parallel-tasks")) {
            return this.getSquashedParallelCostEstimate(true);
        } else {
            return this.getSquashedCostEstimate(true);
        }
    }

    /**
     * Retrieves the cost estimate for this instance.
     *
     * @param isIncludeOverhead whether to include global overhead in the {@link TimeEstimate} (to avoid repeating
     *                          overhead in nested instances)
     * @return the squashed cost estimate
     */
    double getSquashedCostEstimate(boolean isIncludeOverhead) {
        assert Double.isNaN(this.squashedCostEstimateCache) == Double.isNaN(this.squashedCostEstimateWithoutOverheadCache);
        if (Double.isNaN(this.squashedCostEstimateCache)) {
            final double operatorCosts = this.operators.stream()
                    .mapToDouble(op -> this.optimizationContext.getOperatorContext(op).getSquashedCostEstimate())
                    .sum();
            final double junctionCosts = this.optimizationContext.getDefaultOptimizationContexts().stream()
                    .flatMapToDouble(optCtx -> this.junctions.values().stream().mapToDouble(jct -> jct.getSquashedCostEstimate(optCtx)))
                    .sum();
            final double loopCosts = this.loopImplementations.values().stream()
                    .mapToDouble(LoopImplementation::getSquashedCostEstimate)
                    .sum();
            this.squashedCostEstimateWithoutOverheadCache = operatorCosts + junctionCosts + loopCosts;
            double overheadCosts = this.getUtilizedPlatforms().stream()
                    .mapToDouble(platform -> {
                        Configuration configuration = this.optimizationContext.getConfiguration();

                        long startUpTime = configuration.getPlatformStartUpTimeProvider().provideFor(platform);

                        TimeToCostConverter timeToCostConverter = configuration.getTimeToCostConverterProvider().provideFor(platform);
                        ProbabilisticDoubleInterval costs = timeToCostConverter.convert(new TimeEstimate(startUpTime, startUpTime, 1d));

                        final ToDoubleFunction<ProbabilisticDoubleInterval> squasher = configuration.getCostSquasherProvider().provide();
                        return squasher.applyAsDouble(costs);
                    })
                    .sum();
            this.squashedCostEstimateCache = this.squashedCostEstimateWithoutOverheadCache + overheadCosts;
        }
        return isIncludeOverhead ? this.squashedCostEstimateCache : this.squashedCostEstimateWithoutOverheadCache;
    }


    /**
     * Retrieves the cost estimate of input {@link Operator} and input {@link Junction} and recurse if there is input Operators
     *
     * @param operator {@link Operator} that will be used to retreive the cost/squashed costs
     * @return list of probabilisticDoubleInterval where First element is the operator cost and second element is the junction cost; and
     * list of double retreived where First element is the operator squashed cost and second element is the junction squashed cost
     * <p>
     * PS: This function will start with the sink operator
     */


    private Tuple<List<ProbabilisticDoubleInterval>, List<Double>> getParallelOperatorJunctionAllCostEstimate(Operator operator) {

        Set<Operator> inputOperators = new HashSet<>();
        Set<Junction> inputJunction = new HashSet<>();

        List<ProbabilisticDoubleInterval> probalisticCost = new ArrayList<>();
        List<Double> squashedCost = new ArrayList<>();

        // check if the operator cost was already calculated and cached
        for (Tuple<Operator, Tuple<List<ProbabilisticDoubleInterval>, List<Double>>> t : calculatedParallelOperatorCostCache) {
            if (t.field0 == operator)
                return t.field1;
        }

        if (this.optimizationContext.getOperatorContext(operator) != null) {
            // Get input junctions
            this.junctions.values()
                    .forEach(j -> {
                        for (int itr = 0; itr < j.getNumTargets(); itr++) {
                            if (j.getTargetOperator(itr) == operator)
                                inputJunction.add(j);
                        }
                    });
            // Get input operators associated with input junctions
            inputJunction
                    .forEach((Junction j) -> {
                        inputOperators.add(j.getSourceOperator());
                    });

            if (inputOperators.size() == 0) {
                // If there is no input operator, only the cost of the current operator is returned
                probalisticCost.add(this.optimizationContext.getOperatorContext(operator).getCostEstimate());
                probalisticCost.add(new ProbabilisticDoubleInterval(0f, 0f, 0f));
                squashedCost.add(this.optimizationContext.getOperatorContext(operator).getSquashedCostEstimate());
                squashedCost.add(.0);
                Tuple<List<ProbabilisticDoubleInterval>, List<Double>> returnedCost = new Tuple(probalisticCost, squashedCost);
                this.calculatedParallelOperatorCostCache.add(new Tuple(operator, returnedCost));
                return returnedCost;
            } else if (inputOperators.size() == 1) {
                // If there is only one input operator the cost of the current operator plus the cost of the input operator is returned

                // Get the operator probalistic cost and put it as a first element in probalisticCost
                probalisticCost.add(this.optimizationContext.getOperatorContext(operator).getCostEstimate()
                        .plus(this.getParallelOperatorJunctionAllCostEstimate(inputOperators.iterator().next()).field0.get(0)));
                // Get the junction probalistic cost and put it as a second element in probalisticCost
                probalisticCost.add(inputJunction.iterator().next().getCostEstimate(this.optimizationContext.getDefaultOptimizationContexts().get(0))
                        .plus(this.getParallelOperatorJunctionAllCostEstimate(inputOperators.iterator().next()).field0.get(1)));
                // Get the operator squashed cost and put it as a first element in squashedCost
                squashedCost.add(this.optimizationContext.getOperatorContext(operator).getSquashedCostEstimate()
                        + this.getParallelOperatorJunctionAllCostEstimate(inputOperators.iterator().next()).field1.get(0));
                // Get the junction squashed cost and put it as a second element in squashedCost
                squashedCost.add(inputJunction.iterator().next().getSquashedCostEstimate(this.optimizationContext.getDefaultOptimizationContexts().get(0))
                        + this.getParallelOperatorJunctionAllCostEstimate(inputOperators.iterator().next()).field1.get(1));

                Tuple<List<ProbabilisticDoubleInterval>, List<Double>> returnedCost = new Tuple(probalisticCost, squashedCost);
                this.calculatedParallelOperatorCostCache.add(new Tuple(operator, returnedCost));
                return returnedCost;
            } else {
                // If multiple input operators, the cost returned is the max of input operators
                ProbabilisticDoubleInterval maxControlProbabilistic = new ProbabilisticDoubleInterval(0f, 0f, 0f);
                ProbabilisticDoubleInterval maxJunctionProbabilistic = new ProbabilisticDoubleInterval(0f, 0f, 0f);

                double maxControlSquash = 0;
                double maxJunctionSquash = 0;

                for (Iterator<Operator> op = inputOperators.iterator(); op.hasNext(); ) {
                    Tuple<List<ProbabilisticDoubleInterval>, List<Double>> val = this.getParallelOperatorJunctionAllCostEstimate(op.next());
                    List<ProbabilisticDoubleInterval> valProbalistic = val.field0;
                    List<Double> valSquash = val.field1;
                    // Take the max of the probalistic cost
                    if (valProbalistic.get(0).getAverageEstimate() + valProbalistic.get(1).getAverageEstimate() >
                            maxControlProbabilistic.getAverageEstimate() + maxJunctionProbabilistic.getAverageEstimate()) {
                        // Get the control probalistic cost
                        maxControlProbabilistic = valProbalistic.get(0);
                        // Get the junction probalistic cost
                        maxJunctionProbabilistic = valProbalistic.get(1);
                    }
                    // Take the cost of the squashed cost
                    if (valSquash.get(0) > maxControlSquash) {
                        maxControlSquash = valSquash.get(0);
                    }
                    if (valSquash.get(1) > maxJunctionSquash) {
                        maxJunctionSquash = valSquash.get(1);
                    }
                }
                // Get the operator probalistic cost and put it as a first element in probalisticCost
                probalisticCost.add(this.optimizationContext.getOperatorContext(operator).getCostEstimate().plus(maxControlProbabilistic));
                // Get the junction probalistic cost and put it as a second element in probalisticCost
                probalisticCost.add(inputJunction.iterator().next().getCostEstimate(this.optimizationContext.getDefaultOptimizationContexts().get(0))
                        .plus(maxJunctionProbabilistic));
                // Get the operator squashed cost and put it as a first element in squashedCost
                squashedCost.add(this.optimizationContext.getOperatorContext(operator).getSquashedCostEstimate()
                        + maxControlSquash);
                // Get the junction squashed cost and put it as a second element in squashedCost
                squashedCost.add(inputJunction.iterator().next().getSquashedCostEstimate(this.optimizationContext.getDefaultOptimizationContexts().get(0))
                        + maxJunctionSquash);

                Tuple<List<ProbabilisticDoubleInterval>, List<Double>> returnedCost = new Tuple(probalisticCost, squashedCost);
                this.calculatedParallelOperatorCostCache.add(new Tuple(operator, returnedCost));
                return returnedCost;
            }
        } else {
            // Handle the case of a control not defined in this.operators (exp: loop operators)
            double controlSquash = 0;
            double junctionSquash = 0;
            ProbabilisticDoubleInterval controlProbabilistic = new ProbabilisticDoubleInterval(0f, 0f, 0f);
            ProbabilisticDoubleInterval junctionProbabilistic = new ProbabilisticDoubleInterval(0f, 0f, 0f);

            probalisticCost.add(controlProbabilistic);
            probalisticCost.add(junctionProbabilistic);
            squashedCost.add(controlSquash);
            squashedCost.add(junctionSquash);

            return new Tuple<>(probalisticCost, squashedCost);
        }
    }

    /**
     * Retrieves the cost estimate for this instance taking into account parallel stage execution.
     *
     * @param isIncludeOverhead whether to include global overhead in the {@link TimeEstimate} (to avoid repeating
     *                          overhead in nested instances)
     * @return the cost estimate taking into account parallel stage execution
     */
    ProbabilisticDoubleInterval getParallelCostEstimate(boolean isIncludeOverhead) {
        ProbabilisticDoubleInterval parallelCostEstimateWithoutOverhead, parallelCostEstimate;

        if (this.parallelCostEstimateCache == null) {
            // It means that the squashed cost is not yet called, might be only one possible execution plan
            this.getSquashedParallelCostEstimate(true);
        }

        final ProbabilisticDoubleInterval loopCosts = this.loopImplementations.values().stream()
                .map(LoopImplementation::getCostEstimate)
                .reduce(ProbabilisticDoubleInterval.zero, ProbabilisticDoubleInterval::plus);
        parallelCostEstimateWithoutOverhead = this.parallelCostEstimateCache.field0.get(0).plus(this.parallelCostEstimateCache.field0.get(1)).plus(loopCosts);
        ProbabilisticDoubleInterval overheadCosts = this.getUtilizedPlatforms().stream()
                .map(platform -> {
                    Configuration configuration = this.optimizationContext.getConfiguration();
                    long startUpTime = configuration.getPlatformStartUpTimeProvider().provideFor(platform);
                    TimeToCostConverter timeToCostConverter = configuration.getTimeToCostConverterProvider().provideFor(platform);
                    return timeToCostConverter.convert(new TimeEstimate(startUpTime, startUpTime, 1d));
                })
                .reduce(ProbabilisticDoubleInterval.zero, ProbabilisticDoubleInterval::plus);
        parallelCostEstimate = parallelCostEstimateWithoutOverhead.plus(overheadCosts);
        return isIncludeOverhead ? parallelCostEstimate : parallelCostEstimateWithoutOverhead;
    }

    /**
     * Retrieves the cost estimate for this instance taking into account parallel stage execution.
     *
     * @param isIncludeOverhead whether to include global overhead in the {@link TimeEstimate} (to avoid repeating
     *                          overhead in nested instances)
     * @return the squashed cost estimate taking into account parallel stage execution
     */
    double getSquashedParallelCostEstimate(boolean isIncludeOverhead) {
        // Collect sink operators by Removing all operators that have an output
        Set<Operator> sinkOperators;
        sinkOperators = this.operators.stream()
                .filter(op -> op.getNumOutputs() == 0)
                .collect(Collectors.toSet());

        // Retrieve operator and junction cost with parallel stage consideration
        double parallelOperatorCosts = 0f;
        double parallelJunctionCosts = 0f;

        // Iterate through all sinks to find the expensive sink
        for (Operator op : sinkOperators) {
            Tuple<List<ProbabilisticDoubleInterval>, List<Double>> tempParallelCostEstimate = this.getParallelOperatorJunctionAllCostEstimate(op);
            List<Double> tempSquashedCost = tempParallelCostEstimate.field1;

            if (tempSquashedCost.get(0) + tempSquashedCost.get(1) > parallelOperatorCosts + parallelJunctionCosts) {
                parallelOperatorCosts = tempSquashedCost.get(0);
                parallelJunctionCosts = tempSquashedCost.get(1);
                this.parallelCostEstimateCache = tempParallelCostEstimate;
            }
        }
        final double loopCosts = this.loopImplementations.values().stream()
                .mapToDouble(LoopImplementation::getSquashedCostEstimate)
                .sum();
        final double parallelSquashedCostEstimateWithoutOverhead = parallelOperatorCosts + parallelJunctionCosts + loopCosts;
        double overheadCosts = this.getUtilizedPlatforms().stream()
                .mapToDouble(platform -> {
                    Configuration configuration = this.optimizationContext.getConfiguration();

                    long startUpTime = configuration.getPlatformStartUpTimeProvider().provideFor(platform);

                    TimeToCostConverter timeToCostConverter = configuration.getTimeToCostConverterProvider().provideFor(platform);
                    ProbabilisticDoubleInterval costs = timeToCostConverter.convert(new TimeEstimate(startUpTime, startUpTime, 1d));

                    final ToDoubleFunction<ProbabilisticDoubleInterval> squasher = configuration.getCostSquasherProvider().provide();
                    return squasher.applyAsDouble(costs);
                })
                .sum();
        final double parallelSquashedCostEstimate = parallelSquashedCostEstimateWithoutOverhead + overheadCosts;
        return isIncludeOverhead ? parallelSquashedCostEstimate : parallelSquashedCostEstimateWithoutOverhead;
    }


    public Junction getJunction(OutputSlot<?> output) {
        return this.junctions.get(output);
    }

    public void putJunction(OutputSlot<?> output, Junction junction) {
        final Junction oldValue = junction == null ?
                this.junctions.remove(output) :
                this.junctions.put(output, junction);
        if (oldValue != null) {
            logger.warn("Replaced {} with {}.", oldValue, junction);
        }
    }

    public OptimizationContext getOptimizationContext() {
        return this.optimizationContext;
    }

    /**
     * Merges the {@link OptimizationContext}s of the {@link Junction}s in this instance into its main
     * {@link OptimizationContext}/
     */
    public void mergeJunctionOptimizationContexts() {
        // Merge the top-level Junctions.
        for (Junction junction : this.junctions.values()) {
            junction.getOptimizationContexts().forEach(OptimizationContext::mergeToBase);
        }

        // Descend into loops.
        this.loopImplementations.values().stream()
                .flatMap(loopImplementation -> loopImplementation.getIterationImplementations().stream())
                .map(LoopImplementation.IterationImplementation::getBodyImplementation)
                .forEach(PlanImplementation::mergeJunctionOptimizationContexts);
    }

    public void logTimeEstimates() {
        if (!this.logger.isDebugEnabled()) return;

        this.logger.debug(">>> Regular operators");
        for (ExecutionOperator operator : this.operators) {
            this.logger.debug("Estimated execution time of {}: {}",
                    operator, this.optimizationContext.getOperatorContext(operator).getTimeEstimate()
            );
        }
        this.logger.debug(">>> Glue operators");
        for (Junction junction : junctions.values()) {
            for (ExecutionTask task : junction.getConversionTasks()) {
                final ExecutionOperator operator = task.getOperator();
                this.logger.debug("Estimated execution time of {}: {}",
                        operator, this.optimizationContext.getOperatorContext(operator).getTimeEstimate()
                );
            }
        }
        this.logger.debug(">>> Loops");
        for (LoopImplementation loopImplementation : this.loopImplementations.values()) {
            for (LoopImplementation.IterationImplementation iterationImplementation : loopImplementation.getIterationImplementations()) {
                iterationImplementation.getBodyImplementation().logTimeEstimates();
            }
        }
    }

    /**
     * Retrieve the {@link Platform}s that are utilized by this instance.
     *
     * @return the {@link Platform}s
     */
    public Set<Platform> getUtilizedPlatforms() {
        if (this.platformCache == null) {
            this.platformCache = this.streamOperators()
                    .map(ExecutionOperator::getPlatform)
                    .collect(Collectors.toSet());
        }
        return this.platformCache;
    }

    /**
     * Stream all the {@link ExecutionOperator}s in this instance.
     *
     * @return a {@link Stream} containing every {@link ExecutionOperator} at least once
     */
    Stream<ExecutionOperator> streamOperators() {
        Stream<ExecutionOperator> operatorStream = Stream.concat(
                this.operators.stream(),
                this.junctions.values().stream().flatMap(j -> j.getConversionTasks().stream()).map(ExecutionTask::getOperator)
        );
        if (!this.loopImplementations.isEmpty()) {
            operatorStream = Stream.concat(
                    operatorStream,
                    this.loopImplementations.values().stream().flatMap(LoopImplementation::streamOperators)
            );
        }
        return operatorStream;
    }

    @Override
    public String toString() {
        return String.format("PlanImplementation[%s, %s, costs=%s]",
                this.getUtilizedPlatforms(), this.getTimeEstimate(), this.getCostEstimate()
        );
    }

    /**
     * Creates a new {@link ConcatenationDescriptor} for this instance.
     *
     * @param output the relevant {@link OutputSlot} or {@code null}
     * @param inputs the relevant {@link InputSlot}s; components can be {@code null}
     * @return the {@link ConcatenationDescriptor}
     */
    ConcatenationDescriptor createConcatenationDescriptor(OutputSlot<?> output, List<InputSlot<?>> inputs) {
        return new ConcatenationDescriptor(output, inputs);
    }

    /**
     * Amends a {@link ConcatenationGroupDescriptor} by {@link PlanImplementation}-specific information.
     */
    class ConcatenationDescriptor {

        final ConcatenationGroupDescriptor groupDescriptor;

        final PlanImplementation execOutputPlanImplementation;

        /**
         * Creates a new instance.
         */
        ConcatenationDescriptor(OutputSlot<?> output, List<InputSlot<?>> inputs) {
            // Find the ExecutionOperator's corresponding OutputSlot along with the nested PlanImplementation.
            OutputSlot<?> execOutput = null;
            PlanImplementation execOutputPlanImplementation = null;
            if (output != null) {
                Collection<Tuple<OutputSlot<?>, PlanImplementation>> execOpOutputsWithContext =
                        PlanImplementation.this.findExecutionOperatorOutputWithContext(output);
                final Tuple<OutputSlot<?>, PlanImplementation> execOpOutputWithCtx =
                        WayangCollections.getSingleOrNull(execOpOutputsWithContext);
                assert execOpOutputsWithContext != null : String.format("No outputs found for %s.", output);
                execOutput = execOpOutputWithCtx.field0;
                execOutputPlanImplementation = execOpOutputWithCtx.field1;
            }

            // Find the ExecutionOperators' corresponding InputSlots.
            List<Set<InputSlot<?>>> execInputs = new ArrayList<>(inputs.size());
            for (InputSlot<?> input : inputs) {
                if (input == null) {
                    execInputs.add(null);
                } else {
                    execInputs.add(WayangCollections.asSet(PlanImplementation.this.findExecutionOperatorInputs(input)));
                }
            }

            this.groupDescriptor = new ConcatenationGroupDescriptor(execOutput, execInputs);
            this.execOutputPlanImplementation = execOutputPlanImplementation;
        }

        PlanImplementation getPlanImplementation() {
            return PlanImplementation.this;
        }

    }

    /**
     * Describes a group of {@link PlanImplementation}s in terms of their implementations for some {@link OutputSlot} and
     * {@link InputSlot}s. These {@link Slot}s are not stored in this class and must be clear from the context.
     */
    static class ConcatenationGroupDescriptor {

        /**
         * A corresponding {@link ExecutionOperator}s {@link OutputSlot} or {@code null}.
         */
        final OutputSlot<?> execOutput;

        /**
         * {@link Set}s of corresponding {@link ExecutionOperator}s' {@link InputSlot}s. Individual components can
         * be {@code null} if the {@link PlanImplementation}s do not implement the corresponding {@link InputSlot}.
         */
        final List<Set<InputSlot<?>>> execInputs;

        ConcatenationGroupDescriptor(OutputSlot<?> execOutput, List<Set<InputSlot<?>>> execInputs) {
            this.execOutput = execOutput;
            this.execInputs = execInputs;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            final ConcatenationGroupDescriptor that = (ConcatenationGroupDescriptor) o;
            return Objects.equals(execOutput, that.execOutput) &&
                    Objects.equals(execInputs, that.execInputs);
        }

        @Override
        public int hashCode() {
            return Objects.hash(execOutput, execInputs);
        }
    }
}
