/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.core.platform;

import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionPlan;
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionStageLoop;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.LoopHeadOperator;
import org.apache.wayang.core.plan.wayangplan.LoopSubplan;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.profiling.InstrumentationStrategy;
import org.apache.wayang.core.util.AbstractReferenceCountable;
import org.apache.wayang.core.util.Formats;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.function.Supplier;

/**
 * Executes a (cross-platform) {@link ExecutionPlan}.
 */
public class CrossPlatformExecutor implements ExecutionState {

    public final Logger logger = LogManager.getLogger(this.getClass());

    /**
     * The {@link Job} that is being executed by this instance.
     */
    private final Job job;

    /**
     * Aggregates user-defined {@link Breakpoint}s. Will be cleared after each execution.
     */
    private Breakpoint breakpoint = Breakpoint.NONE;

    /**
     * All {@link ExecutionStage}s in the processed {@link ExecutionPlan}.
     */
    private final Set<ExecutionStage> allStages = new HashSet<>();

    /**
     * Activated and considered for execution.
     */
    private final Queue<StageActivator> activatedStageActivators = new LinkedList<>();

    /**
     * Keeps track of {@link StageActivator}s.
     */
    private final Map<ExecutionStage, StageActivator> pendingStageActivators = new HashMap<>();

    /**
     * Maintains the {@link Executor}s for each {@link Platform}.
     */
    private final Map<Platform, Executor> executors = new HashMap<>();

    /**
     * We keep them around if we want to go on without re-optimization.
     */
    private final Collection<StageActivator> suspendedStages = new LinkedList<>();

    /**
     * When executing an {@link ExecutionStageLoop}, we might need to reuse several {@link ExecutionResource}s
     * among all iterations. If we would go with our normal handling scheme, we might lose them after the first
     * iteration. Therefore, we actively keep track of them via {@link ExecutionStageLoopContext}s.
     */
    private final Map<ExecutionStageLoop, ExecutionStageLoopContext> loopContexts = new HashMap<>();

    /**
     * Marks {@link Channel}s for instrumentation.
     */
    private final InstrumentationStrategy instrumentationStrategy;

    /**
     * Keeps track of {@link ExecutionStage}s that have actually been executed by this instance.
     */
    private Set<ExecutionStage> completedStages = new HashSet<>();

    /**
     * Keeps track of {@link ChannelInstance} cardinalities.
     */
    private final Collection<ChannelInstance> cardinalityMeasurements = new LinkedList<>();

    /**
     * Maintains {@link ExecutionResource}s that are "global" w.r.t. to this instance, i.e., they will not be
     * instantly disposed if not currently used.
     */
    private final Set<ExecutionResource> globalResources = new HashSet<>(2);

    /**
     * Keeps track of {@link ChannelInstance}s so as to reuse them among {@link Executor} runs.
     */
    private final Map<Channel, ChannelInstance> channelInstances = new HashMap<>();

    /**
     * Gathers {@link PartialExecution}s created during the execution.
     */
    private final Collection<PartialExecution> partialExecutions = new LinkedList<>();

    /**
     * Gathers {@link ParallelExecutionThread}s created during parallel execution.
     */
    private final ArrayList<Thread> parallelExecutionThreads = new ArrayList<>();

    /**
     * Keeps track of the completed {@link ParallelExecutionThread}s created during parallel execution.
     */
    private volatile int completedThreads;

    public CrossPlatformExecutor(Job job, InstrumentationStrategy instrumentationStrategy) {
        this.job = job;
        this.instrumentationStrategy = instrumentationStrategy;
    }

    /**
     * Execute the given {@link ExecutionPlan}.
     *
     * @param executionPlan that should be executed or continued
     * @return whether the {@link ExecutionPlan} was completed (i.e., not suspended due to the {@link Breakpoint})
     */
    public boolean executeUntilBreakpoint(ExecutionPlan executionPlan, OptimizationContext optimizationContext) {
        // Initialize this instance from the executionPlan.
        this.prepare(executionPlan, optimizationContext);

        // Run until the #breakpoint inhibits or no ExecutionStages are left.
        this.runToBreakpoint();

        return this.suspendedStages.isEmpty();
    }

    /**
     * Clean up {@link ExecutionStage}-related state and re-create {@link StageActivator}s etc. from the {@link ExecutionPlan}/
     *
     * @param executionPlan       whose {@link ExecutionStage}s will be executed
     * @param optimizationContext contains additional optimization info for the {@code executionPlan}
     */
    public void prepare(ExecutionPlan executionPlan, OptimizationContext optimizationContext) {
        this.allStages.clear();
        this.activatedStageActivators.clear();
        this.suspendedStages.clear();

        // Remove obsolete StageActivators (after re-optimization).
        this.allStages.addAll(executionPlan.getStages());
        new ArrayList<>(this.pendingStageActivators.keySet()).stream()
                .filter(stage -> !this.allStages.contains(stage))
                .forEach(this.pendingStageActivators::remove);

        // Create StageActivators for all ExecutionStages.
        for (ExecutionStage stage : this.allStages) {
            // Avoid re-activating already executed ExecutionStages.
            if (this.completedStages.contains(stage)) continue;
            final StageActivator activator = this.getOrCreateActivator(
                    stage,
                    () -> this.determineInitialOptimizationContext(stage, optimizationContext)
            );
            this.tryToActivate(activator);
        }
    }

    /**
     * Find the initial {@link OptimizationContext} for a {@link StageActivator}.
     *
     * @param stage the {@link ExecutionStage} whose {@link OptimizationContext} is to be determined
     * @return the {@link OptimizationContext}
     */
    private OptimizationContext determineInitialOptimizationContext(ExecutionStage stage, OptimizationContext rootOptimizationContext) {
        if (stage.getLoop() == null) return rootOptimizationContext;

        // TODO: Assumes non-nested loops.
        return rootOptimizationContext.getNestedLoopContext(stage.getLoop().getLoopSubplan()).getInitialIterationContext();
    }

    /**
     * Returns an existing {@link StageActivator} for the {@link ExecutionStage} or creates and registers a new one.
     *
     * @param stage                       for which the {@link StageActivator} is requested
     * @param optimizationContextSupplier supplies an {@link OptimizationContext} for the {@code stage}
     * @return the {@link StageActivator}
     */
    private StageActivator getOrCreateActivator(
            ExecutionStage stage,
            final Supplier<OptimizationContext> optimizationContextSupplier
    ) {
        return this.pendingStageActivators.computeIfAbsent(stage, s -> new StageActivator(s, optimizationContextSupplier.get()));
    }

    /**
     * If the {@link StageActivator} can be activate, move it from {@link #pendingStageActivators} to {@link #activatedStageActivators}.
     *
     * @param activator that should be activated
     * @return whether an activation took place
     */
    private boolean tryToActivate(StageActivator activator) {
        if (activator.updateInputChannelInstances()) {
            logger.info("Activating {}.", activator.getStage());
            // Activate the activator by moving it.
            this.pendingStageActivators.remove(activator.getStage());
            assert this.activatedStageActivators.stream().noneMatch(a -> a.getStage().equals(activator.getStage())) :
                    String.format("Must not activate %s twice.", activator.getStage());
            this.activatedStageActivators.add(activator);
            activator.noteActivation();
            return true;
        }
        return false;
    }

    /**
     * Execute one single {@link ExecutionStage}
     */

    private void executeSingleStage(boolean isBreakpointsDisabled, StageActivator stageActivator) {
        // Check if #breakpoint permits the execution.
        if (!isBreakpointsDisabled && this.suspendIfBreakpointRequest(stageActivator)) {
            return;
        }

        // Otherwise, execute the stage.
        this.execute(stageActivator);

        // Try to activate the successor stages.
        this.tryToActivateSuccessors(stageActivator);

        // We can now dispose the stageActivator that collected the input ChannelInstances.
        stageActivator.dispose();

        // Dispose obsolete ChannelInstances.
        final Iterator<Map.Entry<Channel, ChannelInstance>> iterator = this.channelInstances.entrySet().iterator();
        while (iterator.hasNext()) {
            final Map.Entry<Channel, ChannelInstance> channelInstanceEntry = iterator.next();
            final ChannelInstance channelInstance = channelInstanceEntry.getValue();

            // If this is instance is the only one to still use this ChannelInstance, discard it.
            if (channelInstance.getNumReferences() == 1) {
                channelInstance.noteDiscardedReference(true);
                iterator.remove();
            }
        }
    }


    /**
     * Run parallel threads executing activated {@link ExecutionStage}s
     */
    private void runParallelExecution(boolean isBreakpointsDisabled) {
        int numActiveStages = this.activatedStageActivators.size();

        // Create execution threads
        for (int i = 1; i <= numActiveStages; ++i) {
            // TODO: Better pass the stage to the thread rather than letting the thread retrieve the stage itself (to avoid concurrency issues).
            Thread thread = new Thread(new ParallelExecutionThread(isBreakpointsDisabled, "T" + String.valueOf(i), this));
            // Start thread execution
            thread.start();
            this.parallelExecutionThreads.add(thread);
        }

        //Join all created threads
        for (Thread t : this.parallelExecutionThreads) {
            try {
                t.join();
            } catch (InterruptedException e) {
                CrossPlatformExecutor.this.logger.error("Thread Interrupted!", e);
            }
        }

        // Clear the list of created threads
        parallelExecutionThreads.clear();
        CrossPlatformExecutor.this.logger.info("Parallel execution ended!");
    }

    /**
     * Activate and execute {@link ExecutionStage}s as far as possible.
     */
    private void runToBreakpoint() {
        // Start execution traversal.
        final long startTime = System.currentTimeMillis();
        int numPriorExecutedStages = this.completedStages.size();
        int numExecutedStages;
        boolean isBreakpointsDisabled = false;
        do {
            // Execute and activate as long as possible.
            while (!this.activatedStageActivators.isEmpty()) {
                // Check if there is multiple activated stages to start parallelization
                if (this.activatedStageActivators.size() > 1 &&
                        this.getConfiguration().getBooleanProperty("wayang.core.optimizer.enumeration.parallel-tasks")) {
                    // Run multiple threads for each independant stage
                    this.runParallelExecution(isBreakpointsDisabled);
                } else {
                    final StageActivator stageActivator = this.activatedStageActivators.poll();
                    // Execute one single ExecutionStage
                    this.executeSingleStage(isBreakpointsDisabled, stageActivator);
                }
            }

            // Safety net to recover from illegal Breakpoint configurations.
            numExecutedStages = this.completedStages.size() - numPriorExecutedStages;
            if (!isBreakpointsDisabled && numExecutedStages == 0) {
                this.logger.warn("Could not execute a single stage. Will retry with disabled breakpoints.");
                isBreakpointsDisabled = true;
                this.activatedStageActivators.addAll(this.suspendedStages);
                this.suspendedStages.clear();
            } else {
                isBreakpointsDisabled = false;
            }
        } while (!this.activatedStageActivators.isEmpty());

        // Get the number of executed stages in current runToBreakpoint
        final long finishTime = System.currentTimeMillis();
        CrossPlatformExecutor.this.logger.info("Executed {} stages in {}.",
                numExecutedStages, Formats.formatDuration(finishTime - startTime, true));

        assert numExecutedStages > 0 : "Did not execute a single stage.";
    }

    /**
     * If the {@link #breakpoint} requests not to execute the given {@link ExecutionStage}, put it to
     * {@link #suspendedStages}.
     *
     * @param stageActivator that might be suspended
     * @return whether the {@link ExecutionStage} was suspended
     */
    private boolean suspendIfBreakpointRequest(StageActivator stageActivator) {
        if (!this.breakpoint.permitsExecutionOf(stageActivator.getStage(), this, this.job.getOptimizationContext())) {
            this.suspendedStages.add(stageActivator);
            return true;
        }
        return false;
    }

    /**
     * Tries to execute the given {@link ExecutionStage}.
     *
     * @param stageActivator that should be executed
     * @return whether the {@link ExecutionStage} was really executed
     */
    private void execute(StageActivator stageActivator) {
        final ExecutionStage stage = stageActivator.getStage();
        final OptimizationContext optimizationContext = stageActivator.getOptimizationContext();

        // Find parts of the stage to instrument.
        this.instrumentationStrategy.applyTo(stage);

        // Obtain an Executor for the stage.
        Executor executor = this.getOrCreateExecutorFor(stage);

        // Have the execution done.
        CrossPlatformExecutor.this.logger.info("Having {} execute {}:\n{}", executor, stage, stage.getPlanAsString("> "));
        long startTime = System.currentTimeMillis();
        executor.execute(stage, optimizationContext, this);
        long finishTime = System.currentTimeMillis();
        CrossPlatformExecutor.this.logger.info("Executed {} in {}.", stage, Formats.formatDuration(finishTime - startTime, true));

        // Remember that we have executed the stage.
        this.completedStages.add(stage);

        if (stage.isLoopHead()) {
            this.getOrCreateLoopContext(stage.getLoop()).scrapPreviousTransitionContext();
        }
    }

    private Executor getOrCreateExecutorFor(ExecutionStage stage) {
        return this.executors.computeIfAbsent(
                stage.getPlatformExecution().getPlatform(),
                platform -> {
                    // It is important to register the Executor. This way, we ensure that it will also not be disposed
                    // among disconnected PlatformExecutions. The downside is, that we only remove it, once the
                    // execution is done.
                    final Executor executor = platform.getExecutorFactory().create(this.job);
                    this.registerGlobal(executor);
                    return executor;
                }
        );
    }

    /**
     * Follows the outbound {@link Channel}s of the given {@link ExecutionStage} and try to activate consuming
     * {@link ExecutionStage}s, given the according {@link ChannelInstance}s are available.
     *
     * @param processedStageActivator should have just been executed
     */
    private void tryToActivateSuccessors(StageActivator processedStageActivator) {
        final ExecutionStage processedStage = processedStageActivator.getStage();

        // Gather all successor ExecutionStages for that a new ChannelInstance has been produced.
        final Collection<Channel> outboundChannels = processedStage.getOutboundChannels();
        Set<ExecutionStage> successorStages = new HashSet<>(outboundChannels.size());
        for (Channel outboundChannel : outboundChannels) {
            for (ExecutionTask consumer : outboundChannel.getConsumers()) {
                if (this.getChannelInstance(outboundChannel, consumer.isFeedbackInput(outboundChannel)) != null) {
                    final ExecutionStage consumerStage = consumer.getStage();
                    // We must be careful: outbound Channels still can have consumers within the producer's ExecutionStage.
                    if (consumerStage != processedStage && !consumerStage.isInFinishedLoop()) {
                        successorStages.add(consumerStage);
                    }
                }
            }
        }

        // Try to activate follow-up stages.
        for (ExecutionStage successorStage : successorStages) {
            final StageActivator activator = this.getOrCreateActivator(
                    successorStage,
                    () -> this.determineNextOptimizationContext(processedStageActivator, successorStage)
            );
            this.tryToActivate(activator);
        }
    }

    /**
     * Find the {@link OptimizationContext} for a {@link StageActivator} that is activated from a preceeding
     * {@link StageActivator}.
     *
     * @param processedStageActivator the preceeding {@link StageActivator}
     * @param successorStage          the {@link StageActivator} whose {@link OptimizationContext} is to be determined
     * @return the {@link OptimizationContext}
     */
    private OptimizationContext determineNextOptimizationContext(
            StageActivator processedStageActivator,
            ExecutionStage successorStage
    ) {
        final OptimizationContext prevOptimizationContext = processedStageActivator.getOptimizationContext();

        // If the sucessor is not in a loop, we go to the root OptimizationContext.
        if (successorStage.getLoop() == null) {
            return prevOptimizationContext.getRootParent();
        }

        final ExecutionStage processedStage = processedStageActivator.getStage();
        if (processedStage.getLoop() == successorStage.getLoop()) {
            // If we stay in the very same loop...
            if (successorStage.isLoopHead()) {
                // ...and the next stage is a loop head, then we need to switch to the next iteration context.
                return prevOptimizationContext.getNextIterationContext();
            } else {
                // We need to check, that we do not run into the last iteration context, where there is no more
                // information for the loop body.
                if (!prevOptimizationContext.isFinalIteration()) {
                    return prevOptimizationContext;
                }
                // Sneak in a new OptimizationContext for the new iteration.
                return prevOptimizationContext.getLoopContext().appendIterationContext();

            }
        }

        // Otherwise, we enter a loop.
        // TODO: This code assumes non-nested loops.
        final LoopSubplan loopSubplan = successorStage.getLoop().getLoopSubplan();
        return prevOptimizationContext
                .getRootParent()
                .getNestedLoopContext(loopSubplan)
                .getInitialIterationContext();
    }


    /**
     * Retrieve an existing {@link ExecutionStageLoopContext} or create a new one for a {@link ExecutionStageLoop}.
     *
     * @param loop for that a {@link ExecutionStageLoopContext} is requested
     * @return the {@link ExecutionStageLoopContext}
     */
    private ExecutionStageLoopContext getOrCreateLoopContext(ExecutionStageLoop loop) {
        return this.loopContexts.computeIfAbsent(loop, ExecutionStageLoopContext::new);
    }

    /**
     * Removes an {@link ExecutionStageLoopContext}.
     *
     * @param loop that is described by the {@link ExecutionStageLoopContext}
     */
    private void removeLoopContext(ExecutionStageLoop loop) {
        final ExecutionStageLoopContext context = this.loopContexts.remove(loop);
        assert context.getNumReferences() == 0;
    }

    @Override
    public ChannelInstance getChannelInstance(Channel channel) {
        return this.getChannelInstance(channel, false);
    }

    public ChannelInstance getChannelInstance(Channel channel, boolean isPeekingToNextTransition) {
        final ExecutionStageLoop loop = getExecutionStageLoop(channel);
        if (loop == null) {
            return this.channelInstances.get(channel);
        } else {
            final ExecutionStageLoopContext loopContext = this.getOrCreateLoopContext(loop);
            return loopContext.getChannelInstance(channel, isPeekingToNextTransition);
        }
    }

    /**
     * Determine the {@link ExecutionStageLoop} the given {@link Channel} belongs to.
     *
     * @param channel the {@link Channel}
     * @return the {@link ExecutionStageLoop} or {@code null} if none
     */
    private static ExecutionStageLoop getExecutionStageLoop(Channel channel) {
        final ExecutionStage producerStage = channel.getProducer().getStage();
        if (producerStage.getLoop() == null) return null;
        final OutputSlot<?> output = channel.getProducer().getOutputSlotFor(channel);
        if (output != null
                && output.getOwner().isLoopHead()
                && ((LoopHeadOperator) output.getOwner()).getFinalLoopOutputs().contains(output)) {
            return null;
        }
        return producerStage.getLoop();
    }

    @Override
    public void register(ChannelInstance channelInstance) {
        final ExecutionStageLoop loop = getExecutionStageLoop(channelInstance.getChannel());
        if (loop == null) {
            final ChannelInstance oldChannelInstance = this.channelInstances.put(channelInstance.getChannel(), channelInstance);
            channelInstance.noteObtainedReference();
            if (oldChannelInstance != null) {
                oldChannelInstance.noteDiscardedReference(true);
            }
        } else {
            final ExecutionStageLoopContext loopContext = this.getOrCreateLoopContext(loop);
            loopContext.register(channelInstance);
        }
    }

    /**
     * Register a global {@link ExecutionResource}, that will only be released once this instance has
     * finished executing.
     *
     * @param resource that should be registered
     */
    public void registerGlobal(ExecutionResource resource) {
        if (this.globalResources.add(resource)) {
            resource.noteObtainedReference();
        } else {
            this.logger.warn("Registered {} twice.", resource);
        }
    }

    @Override
    public void addCardinalityMeasurement(ChannelInstance channelInstance) {
        this.cardinalityMeasurements.add(channelInstance);
    }

    @Override
    public Collection<ChannelInstance> getCardinalityMeasurements() {
        return this.cardinalityMeasurements;
    }

    @Override
    public void add(PartialExecution partialExecution) {
        this.partialExecutions.add(partialExecution);
        if (this.logger.isInfoEnabled()) {
            this.logger.info(
                    "Executed {} items in {} (estimated {}).",
                    partialExecution.getAtomicExecutionGroups().size(),
                    Formats.formatDuration(partialExecution.getMeasuredExecutionTime()),
                    partialExecution.getOverallTimeEstimate(this.getConfiguration())
            );
        }
    }

    @Override
    public Collection<PartialExecution> getPartialExecutions() {
        return this.partialExecutions;
    }

    /**
     * Set a new {@link Breakpoint} for this instance.
     *
     * @param breakpoint the new {@link Breakpoint}
     */
    public void setBreakpoint(Breakpoint breakpoint) {
        this.breakpoint = breakpoint;
    }

    /**
     * Allows to inhibit changes to the {@link ExecutionPlan}, such as on re-optimization.
     *
     * @return whether this instance is vetoing on changes
     */
    public boolean isVetoingPlanChanges() {
        return !this.loopContexts.isEmpty();
    }

    public void shutdown() {
        // Release global resources.
        this.globalResources.forEach(resource -> resource.noteDiscardedReference(true));
        this.globalResources.clear();
    }

    /**
     * Retrieve the {@link ExecutionStage}s that have been completed so far.
     *
     * @return the completed {@link ExecutionStage}s
     */
    public Set<ExecutionStage> getCompletedStages() {
        return this.completedStages;
    }

    public Configuration getConfiguration() {
        return this.job.getConfiguration();
    }

    /**
     * Observes the {@link CrossPlatformExecutor} execution state in order to tell when the input dependencies of
     * a {@link ExecutionStage} are satisfied so that it can be activated.
     */
    private class StageActivator {

        /**
         * The {@link ExecutionStage} being activated.
         */
        private final ExecutionStage stage;

        /**
         * The {@link OptimizationContext} for the {@link #stage}.
         */
        private final OptimizationContext optimizationContext;

        /**
         * Regular inbound {@link Channel}s to the {@link #stage}.
         */
        private final Collection<Channel> miscInboundChannels = new LinkedList<>();

        /**
         * Inbound {@link Channel}s to the {@link #stage}, that implement an initialization {@link InputSlot}
         * of a {@link LoopHeadOperator}.
         */
        private final Collection<Channel> initializationInboundChannels = new LinkedList<>();

        /**
         * Inbound {@link Channel}s to the {@link #stage}, that implement an iteration {@link InputSlot}
         * of a {@link LoopHeadOperator}.
         */
        private final Collection<Channel> iterationInboundChannels = new LinkedList<>();

        /**
         * Inbound {@link Channel}s to the {@link #stage}, that are loop invariant, i.e., that are needed across
         * several iteration. Of course, this is only possible if {@link #stage} is in an {@link ExecutionStageLoop}.
         */
        private final Collection<Channel> loopInvariantInboundChannels = new LinkedList<>();

        /**
         * If the {@link #stage} is part of a {@link ExecutionStageLoop}, we need to keep track of an associated
         * runtime {@link ExecutionStageLoopContext} that manages loop invariant {@link ExecutionResource}s. In particular,
         * this instance is a stake-holder of this {@link ExecutionStageLoopContext} and tells that it must not be
         * removed.
         */
        private final ExecutionStageLoopContext loopContext;

        /**
         * Keep track of the {@link ChannelInstance}s that are inputs of the {@link #stage}.
         */
        private final Map<Channel, ChannelInstance> inputChannelInstances = new HashMap<>(4);

        /**
         * Creates a new instance.
         *
         * @param stage               that should be activated
         * @param optimizationContext
         */
        private StageActivator(ExecutionStage stage, OptimizationContext optimizationContext) {
            this.stage = stage;
            this.optimizationContext = optimizationContext;

            if (this.stage.getLoop() != null) {
                this.loopContext = CrossPlatformExecutor.this.getOrCreateLoopContext(this.stage.getLoop());
                this.loopContext.noteObtainedReference();
            } else {
                this.loopContext = null;
            }

            // Distinguish the inbound Channels of the stage.
            final Collection<Channel> inboundChannels = this.stage.getInboundChannels();
            if (this.stage.isLoopHead()) {
                assert this.stage.getAllTasks().size() == 1 : String.format("Loop head stage looks like this:\n%s", this.stage.getPlanAsString("! "));

                // Loop heads are special in the sense that they don't require all of their inputs.
                for (Channel inboundChannel : inboundChannels) {
                    for (ExecutionTask executionTask : inboundChannel.getConsumers()) {
                        // Avoid consumers from other ExecutionStages.
                        if (executionTask.getStage() != this.stage) continue;

                        // Check special inputs with iteration semantics.
                        if (executionTask.getOperator().isLoopHead()) {
                            LoopHeadOperator loopHead = (LoopHeadOperator) executionTask.getOperator();
                            final InputSlot<?> targetInput = executionTask.getInputSlotFor(inboundChannel);
                            if (loopHead.getLoopBodyInputs().contains(targetInput)) {
                                this.iterationInboundChannels.add(inboundChannel);
                                continue;
                            } else if (loopHead.getLoopInitializationInputs().contains(targetInput)) {
                                this.initializationInboundChannels.add(inboundChannel);
                                continue;
                            }
                        }

                        // Otherwise, we treat it as a regular inbound Channel.
                        this.miscInboundChannels.add(inboundChannel);
                        assert this.checkIfIsLoopInput(inboundChannel) :
                                String.format("%s is not a loop input as expected.", inboundChannel);
                        this.loopInvariantInboundChannels.add(inboundChannel);
                    }
                }

            } else {
                // If we do not have a loop head, we treat all Channels as regular ones.
                this.miscInboundChannels.addAll(inboundChannels);

                // Still, the Channels might be loop inputs.
                for (Channel inboundChannel : inboundChannels) {
                    if (this.checkIfIsLoopInput(inboundChannel)) {
                        this.loopInvariantInboundChannels.add(inboundChannel);
                    }
                }
            }
        }

        /**
         * Checks if the given {@link Channel} (inbound to {@link #stage}), is entering an {@link ExecutionStageLoop}
         * when serving the {@link #stage}.
         *
         * @param inboundChannel {@link Channel} that is inbound to {@link #stage} and that might be a {@link ExecutionStageLoop} input
         * @return whether the {@link Channel} is a {@link ExecutionStageLoop} input, i.e., it is not produced in an {@link ExecutionStageLoop}
         * while this {@link #stage} is part of a {@link ExecutionStageLoop}
         */
        private boolean checkIfIsLoopInput(Channel inboundChannel) {
            // NB: This code assumes no nested loops.
            return this.stage.getLoop() != null && this.stage.getLoop() != inboundChannel.getProducer().getStage().getLoop();
        }

        /**
         * Try to satisfy the input {@link ChannelInstance} requirements by updating {@link #inputChannelInstances}.
         *
         * @return whether the activation is possible
         */
        boolean updateInputChannelInstances() {
            boolean isMiscChannelsReady = this.updateChannelInstances(this.miscInboundChannels, false);
            boolean isLoopChannelsReady = true;
            if (this.stage.isLoopHead()) {
                LoopHeadOperator loopOperator = (LoopHeadOperator) this.stage.getLoopHeadTask().getOperator();
                switch (loopOperator.getState()) {
                    case NOT_STARTED:
                        isLoopChannelsReady = this.updateChannelInstances(this.initializationInboundChannels, false);
                        break;
                    case RUNNING:
                        isLoopChannelsReady = this.updateChannelInstances(this.iterationInboundChannels, true);
                        break;
                    default:
                        logger.warn("Tried to update input channel instances for finished {}.", this.stage);
                        isLoopChannelsReady = false;
                }
            }
            return isMiscChannelsReady && isLoopChannelsReady;
        }

        /**
         * Try to satisfy the input {@link ChannelInstance} requirements for all given {@link Channel}s by updating {@link #inputChannelInstances}.
         *
         * @param channels for that the {@link ChannelInstance}s are requested
         * @return whether there are {@link ChannelInstance}s for all {@link Channel}s available
         */
        private boolean updateChannelInstances(Collection<Channel> channels, boolean isFeedback) {
            boolean isAllChannelsAvailable = true;
            for (Channel channel : channels) {
                // Check if the ChannelInstance is already known.
                if (this.inputChannelInstances.containsKey(channel)) continue;

                // Otherwise, check if it is available now.
                final ChannelInstance channelInstance = CrossPlatformExecutor.this.getChannelInstance(channel, isFeedback);
                if (channelInstance != null) {
                    // If so, reference it.
                    this.inputChannelInstances.put(channel, channelInstance);
                    channelInstance.noteObtainedReference();

                    // Also, check if this is a loop invariant input.
                    if (this.loopInvariantInboundChannels.contains(channel)) {
                        this.loopContext.registerLoopInvariant(channelInstance);
                    }
                } else {
                    // Otherwise, we will have a negative result. Still, we need to grab all Channels to save them from disposal.
                    isAllChannelsAvailable = false;
                }
            }
            return isAllChannelsAvailable;
        }

        /**
         * Getter for the {@link ExecutionStage} activated by this instance.
         *
         * @return the {@link ExecutionStage}
         */
        public ExecutionStage getStage() {
            return this.stage;
        }

        public OptimizationContext getOptimizationContext() {
            return optimizationContext;
        }

        /**
         * Dispose this instance, in particular discard references to its {@link ChannelInstance}s.
         */
        void dispose() {
            // Release the inputChannelInstances.
            for (ChannelInstance channelInstance : this.inputChannelInstances.values()) {
                channelInstance.noteDiscardedReference(true);
            }
            this.inputChannelInstances.clear();

            // Release the loopContext.
            if (this.loopContext != null) {
                this.loopContext.noteDiscardedReference(true);
            }
        }

        /**
         * Notifies this instance that it has been activated.
         */
        public void noteActivation() {
            if (this.stage.isLoopHead()) this.loopContext.activateNextIteration();
        }
    }


    /**
     * Keeps track of {@link ExecutionResource}s of an {@link ExecutionStageLoop}.
     */
    private class ExecutionStageLoopContext extends AbstractReferenceCountable {

        /**
         * The {@link ExecutionStageLoop} being described by this instance.
         */
        private final ExecutionStageLoop loop;

        /**
         * Maintains the loop invariant {@link ExecutionResource}s.
         */
        private Set<ExecutionResource> loopInvariants = new HashSet<>(4);

        private ExecutionStageLoopIterationContext currentIteration, prevTransition, nextTransition;

        /**
         * Creates a new instance.
         *
         * @param executionStageLoop is described by the new instance
         */
        public ExecutionStageLoopContext(ExecutionStageLoop executionStageLoop) {
            this.loop = executionStageLoop;
        }

        /**
         * Registers a loop invariant {@link ExecutionResource} with this instance.
         *
         * @param loopInvariant the said {@link ExecutionResource}
         */
        void registerLoopInvariant(ExecutionResource loopInvariant) {
            if (this.loopInvariants.add(loopInvariant)) {
                loopInvariant.noteObtainedReference();
            }
        }

        /**
         * Switch the state of this instance: Age the next to the previous transition and create a new
         * current {@link ExecutionStageLoopIterationContext}.
         */
        public void activateNextIteration() {
            logger.info("Activating next iteration.");
            if (this.currentIteration != null) this.currentIteration.noteDiscardedReference(true);
            this.currentIteration = this.createIterationContext();

            if (this.prevTransition != null) this.prevTransition.noteDiscardedReference(true);
            this.prevTransition = this.nextTransition;
            this.nextTransition = null;
        }

        private ExecutionStageLoopIterationContext createIterationContext() {
            final ExecutionStageLoopIterationContext iteration = new ExecutionStageLoopIterationContext(this);
            iteration.noteObtainedReference();
            return iteration;
        }

        /**
         * Create a new {@link ExecutionStageLoopIterationContext} for the next transition if it does not exist.
         *
         * @return the {@link ExecutionStageLoopIterationContext} for the next transition
         */
        public ExecutionStageLoopIterationContext getOrCreateNextTransition() {
            if (this.nextTransition == null) {
                this.nextTransition = this.createIterationContext();
            }
            return this.nextTransition;
        }

        @Override
        protected void disposeUnreferenced() {
            for (ExecutionResource loopInvariant : this.loopInvariants) {
                loopInvariant.noteDiscardedReference(true);
            }
            this.loopInvariants = null;
            CrossPlatformExecutor.this.removeLoopContext(this.loop);
            if (this.prevTransition != null) this.prevTransition.noteDiscardedReference(true);
            if (this.currentIteration != null) this.currentIteration.noteDiscardedReference(true);
            if (this.nextTransition != null) this.nextTransition.noteDiscardedReference(true);
        }

        @Override
        public boolean isDisposed() {
            return this.loopInvariants == null;
        }

        /**
         * Registers the given {@link ChannelInstance} with appropriate {@link ExecutionStageLoopIterationContext}s.
         *
         * @param channelInstance the {@link ChannelInstance}
         */
        public void register(ChannelInstance channelInstance) {
            final Channel channel = channelInstance.getChannel();
            boolean isFeedback = false, isIterationLocal = false;
            for (ExecutionTask consumer : channel.getConsumers()) {
                if (consumer.isFeedbackInput(channel)) {
                    isFeedback = true;
                } else {
                    isIterationLocal = true;
                }
            }
            if (isIterationLocal) this.currentIteration.register(channelInstance);
            if (isFeedback) this.getOrCreateNextTransition().register(channelInstance);
        }

        /**
         * Provide a {@link ChannelInstance} from this instance. The instance can be provided from the previous or
         * next transition or the current {@link ExecutionStageLoopIterationContext}.
         *
         * @param channel                   whose {@link ChannelInstance} should be provided
         * @param isPeekingToNextTransition whether the next transition {@link ExecutionStageLoopIterationContext}
         *                                  may be accessed
         * @return the {@link ChannelInstance} or {@code null} if it cannot be found
         */
        public ChannelInstance getChannelInstance(Channel channel,
                                                  boolean isPeekingToNextTransition) {
            if (isPeekingToNextTransition) {
                return this.getOrCreateNextTransition().getChannelInstance(channel);
            }

            if (this.prevTransition != null) {
                final ChannelInstance channelInstance = this.prevTransition.getChannelInstance(channel);
                if (channelInstance != null) return channelInstance;
            }

            if (this.currentIteration != null) {
                return this.currentIteration.getChannelInstance(channel);
            }
            return null;
        }

        /**
         * Removes the previous transition {@link ExecutionStageLoopIterationContext}. Included resources
         * will not be provided anymore by this instance.
         */
        public void scrapPreviousTransitionContext() {
            if (this.prevTransition != null) this.prevTransition.noteDiscardedReference(true);
            this.prevTransition = null;
        }
    }

    /**
     * Keeps track of {@link ExecutionResource}s of an {@link ExecutionStageLoop}.
     */
    private class ExecutionStageLoopIterationContext extends AbstractReferenceCountable {

        /**
         * The hosting {@link ExecutionStageLoopContext}.
         */
        private final ExecutionStageLoopContext loopContext;

        /**
         * Maintains {@link ChannelInstance}s produced in this iteration.
         */
        private Map<Channel, ChannelInstance> channelInstances = new HashMap<>(8);

        /**
         * Creates a new instance.
         */
        private ExecutionStageLoopIterationContext(ExecutionStageLoopContext loopContext) {
            this.loopContext = loopContext;
        }

        @Override
        protected void disposeUnreferenced() {
            for (ChannelInstance channelInstance : channelInstances.values()) {
                channelInstance.noteDiscardedReference(true);
            }
            this.channelInstances = null;
        }

        /**
         * Registers the given {@link ChannelInstance} with this instance.
         *
         * @param channelInstance that should be registered
         */
        public void register(ChannelInstance channelInstance) {
            channelInstance.noteObtainedReference();
            final ChannelInstance oldInstance = this.channelInstances.put(channelInstance.getChannel(), channelInstance);
            if (oldInstance != null) oldInstance.noteDiscardedReference(true);
        }

        public ChannelInstance getChannelInstance(Channel channel) {
            return this.channelInstances.get(channel);
        }
    }

    /**
     * Executes {@link ExecutionStage}s in parallel threads
     * It continues to live as long as there is a {@link ExecutionStage} activated after first {@link ExecutionStage} execution and another running {@link ParallelExecutionThread}s,
     * if multiple {@link ExecutionStage} are activated it will create new threads to execute new {@link ExecutionStage} in recursive manner
     */

    private class ParallelExecutionThread implements Runnable {

        /**
         * Thread identifier of {@link ParallelExecutionThread}
         */
        public String threadId;

        /**
         * Check if #breakpoint permits the execution of {@link ExecutionStage}
         */
        private boolean thread_isBreakpointDisabled;

        /**
         * {@link CrossPlatformExecutor} initiating the running thread
         */
        private final CrossPlatformExecutor crossPlatformExecutor;

        /**
         * Creates a new instance.
         */
        public ParallelExecutionThread(boolean isBreakpointsDisabled, String id, CrossPlatformExecutor cpe) {
            this.thread_isBreakpointDisabled = isBreakpointsDisabled;
            this.threadId = id;
            this.crossPlatformExecutor = cpe;
        }

        /**
         * Execution code of the thread.
         */
        @Override
        public void run() {

            StageActivator stageActivator;
            this.crossPlatformExecutor.logger.info("Thread " + String.valueOf(this.threadId) + " started");
            // Loop until there is no activated stage or only one thread running
            do {
                // Get the stageActivator for the stage to execute
                synchronized (this.crossPlatformExecutor) {
                    stageActivator = this.crossPlatformExecutor.activatedStageActivators.poll();
                    if (stageActivator == null)
                        break;
                }
                this.crossPlatformExecutor.logger.info(this.threadId + " started executing Stage: {}:", stageActivator.getStage());

                // Check if #breakpoint permits the execution.
                if (!this.thread_isBreakpointDisabled && this.crossPlatformExecutor.suspendIfBreakpointRequest(stageActivator)) {
                    return;
                }

                // Otherwise, execute the stage.

                final ExecutionStage stage = stageActivator.getStage();
                final OptimizationContext optimizationContext = stageActivator.getOptimizationContext();

                // Find parts of the stage to instrument.
                this.crossPlatformExecutor.instrumentationStrategy.applyTo(stage);

                // Obtain an Executor for the stage.
                final Executor executor = this.crossPlatformExecutor.getOrCreateExecutorFor(stage);

                // Have the execution done.
                CrossPlatformExecutor.this.logger.info("Having {} execute {}:\n{}", executor, stage, stage.getPlanAsString("> "));
                long startTime = System.currentTimeMillis();

                // synchronize(this.crossplateform) can be used here to avoid error when we have two stages running same operators even on the same platform but still with different executors
                synchronized (executor) {
                    executor.execute(stage, optimizationContext, this.crossPlatformExecutor);
                    long finishTime = System.currentTimeMillis();

                    CrossPlatformExecutor.this.logger.info("Executed {} in {}.", stage, Formats.formatDuration(finishTime - startTime, true));

                    // Remember that we have executed the stage.
                    this.crossPlatformExecutor.completedStages.add(stage);
                    if (stage.isLoopHead()) {
                        this.crossPlatformExecutor.getOrCreateLoopContext(stage.getLoop()).scrapPreviousTransitionContext();
                    }

                    // Try to activate the successor stages.
                    this.crossPlatformExecutor.tryToActivateSuccessors(stageActivator);
                }
                // We can now dispose the stageActivator that collected the input ChannelInstances.
                stageActivator.dispose();

                // Dispose obsolete ChannelInstances.
                final Iterator<Map.Entry<Channel, ChannelInstance>> iterator = this.crossPlatformExecutor.channelInstances.entrySet().iterator();
                while (iterator.hasNext()) {
                    final Map.Entry<Channel, ChannelInstance> channelInstanceEntry = iterator.next();
                    final ChannelInstance channelInstance = channelInstanceEntry.getValue();

                    // If this is instance is the only one to still use this ChannelInstance, discard it.
                    if (channelInstance.getNumReferences() == 1) {
                        channelInstance.noteDiscardedReference(true);
                        iterator.remove();
                    }

                }

                this.crossPlatformExecutor.logger.info(this.threadId + " completed executing Stage : {}:", stageActivator.getStage());

                // Create new threads for more than one activated stages recursively
                if (CrossPlatformExecutor.this.activatedStageActivators.size() > 1) {
                    // Create new threads other than the existing thread
                    for (int i = 1; i <= CrossPlatformExecutor.this.activatedStageActivators.size() - 1; i++) {
                        // TODO: Better use Java's ForkJoinPool to reduce thread creation overhead and control concurrency.
                        // Create parallel stage execution thread
                        Thread thread = new Thread(new ParallelExecutionThread(this.thread_isBreakpointDisabled, "T" + String.valueOf(i) + "@" + this.threadId, this.crossPlatformExecutor));
                        thread.start();
                        synchronized (this.crossPlatformExecutor) {
                            //Add the created thread to {@link #parallelExecutionThreads}
                            CrossPlatformExecutor.this.parallelExecutionThreads.add(thread);
                        }
                    }
                }

            }

            // TODO: Do not busy-wait (could be solved with the ForkJoinPool as well).
            while (CrossPlatformExecutor.this.activatedStageActivators.size() >= 1 &&
                    CrossPlatformExecutor.this.parallelExecutionThreads.size() - CrossPlatformExecutor.this.completedThreads > 1);

            // Increment a global variable of completed threads
            // As long as the variable is volatile so there is no concern of race condition
            CrossPlatformExecutor.this.completedThreads++;

            // Notify thread ended
            CrossPlatformExecutor.this.logger.info(this.threadId + " ended");
        }
    }

}
