| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.core.optimizer.enumeration; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import org.apache.commons.lang3.Validate; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| import org.apache.wayang.core.plan.executionplan.Channel; |
| import org.apache.wayang.core.plan.executionplan.ExecutionPlan; |
| import org.apache.wayang.core.plan.executionplan.ExecutionStage; |
| import org.apache.wayang.core.plan.executionplan.ExecutionStageLoop; |
| import org.apache.wayang.core.plan.executionplan.ExecutionTask; |
| import org.apache.wayang.core.plan.executionplan.PlatformExecution; |
| import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; |
| import org.apache.wayang.core.plan.wayangplan.InputSlot; |
| import org.apache.wayang.core.plan.wayangplan.LoopHeadOperator; |
| import org.apache.wayang.core.plan.wayangplan.LoopSubplan; |
| import org.apache.wayang.core.plan.wayangplan.OutputSlot; |
| import org.apache.wayang.core.platform.Platform; |
| import org.apache.wayang.core.util.Iterators; |
| import org.apache.wayang.core.util.OneTimeExecutable; |
| import org.apache.wayang.core.util.Tuple; |
| |
| /** |
| * Builds an {@link ExecutionPlan} from a {@link ExecutionTaskFlow}. |
| * <p>Specifically, subdivides the {@link ExecutionTask}s into {@link PlatformExecution}s and {@link ExecutionStage}s, |
| * thereby discarding already executed {@link ExecutionTask}s.</p> |
| * <p>As of now, these are recognized as producers of |
| * {@link Channel}s that are copied (see {@link Channel#isCopy()}). This is because of {@link ExecutionTaskFlowCompiler} that copies {@link Channel}s |
| * to different alternative {@link ExecutionPlan}s on top of existing fixed {@link ExecutionTask}s.</p> |
| */ |
| public class StageAssignmentTraversal extends OneTimeExecutable { |
| |
| private static final Logger logger = LogManager.getLogger(StageAssignmentTraversal.class); |
| |
| /** |
| * Should be turned into a {@link ExecutionPlan}. |
| */ |
| private final ExecutionTaskFlow executionTaskFlow; |
| |
| /** |
| * Assigns {@link ExecutionTask}s with {@link InterimStage}s. |
| */ |
| private final Map<ExecutionTask, InterimStage> assignedInterimStages = new HashMap<>(); |
| |
| /** |
| * Keeps track of {@link InterimStage}s that must be executed before executing a certain {@link ExecutionTask}. |
| */ |
| private final Map<ExecutionTask, Set<InterimStage>> requiredStages = new HashMap<>(); |
| |
| /** |
| * Zero or more {@link StageSplittingCriterion}s to further refine {@link ExecutionStage}s. |
| */ |
| private final Collection<StageSplittingCriterion> splittingCriteria = new LinkedList<>(); |
| |
| /** |
| * All {@link InterimStage}s created by this instance. |
| */ |
| private final Collection<InterimStage> allStages = new LinkedList<>(); |
| |
| /** |
| * Newly created {@link InterimStage}s that might be subject to refinement still. |
| */ |
| private final Collection<InterimStage> newStages = new LinkedList<>(); |
| |
| /** |
| * Maintains {@link ExecutionStageLoop}s that are being created. |
| */ |
| private Map<LoopSubplan, ExecutionStageLoop> stageLoops = new HashMap<>(); |
| |
| /** |
| * Accepts the result of this instance after execution. |
| */ |
| private ExecutionPlan result; |
| |
| /** |
| * Creates a new instance. |
| * |
| * @param executionTaskFlow should be converted into an {@link ExecutionPlan} |
| * @param splittingCriteria to create splits beside the precedence-based splitting |
| */ |
| private StageAssignmentTraversal(ExecutionTaskFlow executionTaskFlow, |
| StageSplittingCriterion... splittingCriteria) { |
| // Some sanity checks. |
| final Set<ExecutionTask> executionTasks = executionTaskFlow.collectAllTasks(); |
| for (ExecutionTask executionTask : executionTasks) { |
| for (int i = 0; i < executionTask.getInputChannels().length; i++) { |
| Channel channel = executionTask.getInputChannels()[i]; |
| if (channel == null) { |
| logger.warn("{} does not have an input channel @{}.", executionTask, i); |
| } |
| } |
| for (int i = 0; i < executionTask.getOutputChannels().length; i++) { |
| Channel channel = executionTask.getOutputChannels()[i]; |
| if (channel == null) { |
| logger.warn("{} does not have an output channel @{}.", executionTask, i); |
| } |
| } |
| } |
| assert executionTaskFlow.isComplete(); |
| |
| // Do some initialization. |
| this.executionTaskFlow = executionTaskFlow; |
| // TODO: The following criterion isolates LoopHeadOperators into own ExecutionStages, so as to avoid problems connected to circular dependencies. But this might not be as performant as it gets. |
| this.splittingCriteria.add(StageAssignmentTraversal::isSuitableForBreakpoint); |
| this.splittingCriteria.add(StageAssignmentTraversal::isLoopHeadInvolved); |
| this.splittingCriteria.add(StageAssignmentTraversal::isLoopBoarder); // Loop boards need be split always. |
| this.splittingCriteria.addAll(Arrays.asList(splittingCriteria)); |
| } |
| |
| /** |
| * Convert an {@link ExecutionTaskFlow} into an {@link ExecutionPlan} by introducing {@link ExecutionStage}s. |
| * |
| * @param executionTaskFlow should be converted |
| * @param additionalSplittingCriteria will be employed to split {@link ExecutionStage}s that are not split necessarily |
| * @return the {@link ExecutionPlan} |
| */ |
| public static ExecutionPlan assignStages(ExecutionTaskFlow executionTaskFlow, |
| StageSplittingCriterion... additionalSplittingCriteria) { |
| final StageAssignmentTraversal instance = new StageAssignmentTraversal(executionTaskFlow, additionalSplittingCriteria); |
| return instance.buildExecutionPlan(); |
| } |
| |
| /** |
| * Tells whether the given {@link Channel} lends itself to a {@link org.apache.wayang.core.platform.Breakpoint}. In |
| * that case, we might want to split an {@link ExecutionStage} here. |
| * |
| * @see StageSplittingCriterion#shouldSplit(ExecutionTask, Channel, ExecutionTask) |
| */ |
| private static boolean isSuitableForBreakpoint(ExecutionTask producer, Channel channel, ExecutionTask consumer) { |
| return channel.isSuitableForBreakpoint(); |
| } |
| |
| /** |
| * Tells whether the given {@link Channel} leaves or enters a loop. |
| * |
| * @see StageSplittingCriterion#shouldSplit(ExecutionTask, Channel, ExecutionTask) |
| */ |
| private static boolean isLoopBoarder(ExecutionTask producer, Channel channel, ExecutionTask consumer) { |
| final ExecutionOperator producerOperator = producer.getOperator(); |
| final LinkedList<LoopSubplan> producerLoopStack = producerOperator.getLoopStack(); |
| |
| final ExecutionOperator consumerOperator = consumer.getOperator(); |
| final LinkedList<LoopSubplan> consumerLoopStack = consumerOperator.getLoopStack(); |
| |
| return !producerLoopStack.equals(consumerLoopStack); |
| } |
| |
| /** |
| * Tells whether the given {@link Channel} connects a {@link LoopHeadOperator}. |
| * |
| * @see StageSplittingCriterion#shouldSplit(ExecutionTask, Channel, ExecutionTask) |
| */ |
| private static boolean isLoopHeadInvolved(ExecutionTask producer, Channel channel, ExecutionTask consumer) { |
| return producer.getOperator().isLoopHead() || consumer.getOperator().isLoopHead(); |
| } |
| |
| /** |
| * Perform the assignment. |
| * |
| * @return the {@link ExecutionPlan} for the {@link ExecutionTaskFlow} specified in the constructor |
| */ |
| private ExecutionPlan buildExecutionPlan() { |
| this.tryExecute(); |
| return this.result; |
| } |
| |
| @Override |
| protected void doExecute() { |
| // Create initial stages. |
| this.discoverInitialStages(); |
| |
| // Refine stages as much as necessary |
| this.refineStages(); |
| if (logger.isDebugEnabled()) { |
| for (InterimStage stage : this.allStages) { |
| logger.debug("Final stage {}: {}", stage, stage.getTasks()); |
| } |
| } |
| |
| // Assemble the ExecutionPlan. |
| this.result = this.assembleExecutionPlan(); |
| } |
| |
| /** |
| * Create an initial assignment for each {@link ExecutionTask}, thereby keeping track of existing |
| * {@link PlatformExecution}s. |
| */ |
| private void discoverInitialStages() { |
| // ExecutionTasks which have to be assigned an InterimStage. |
| final Set<ExecutionTask> relevantTasks = new HashSet<>(); |
| |
| // ExecutionTasks that are staged for exploration. |
| final Queue<ExecutionTask> stagedTasks = new LinkedList<>(this.executionTaskFlow.getSinkTasks()); |
| |
| // Run until all ExecutionTasks have been checked. |
| while (!stagedTasks.isEmpty()) { |
| final ExecutionTask task = stagedTasks.poll(); |
| |
| // Collect the task and make sure we have not seen it yet. |
| if (!relevantTasks.add(task)) continue; |
| |
| for (Channel inputChannel : task.getInputChannels()) { |
| if (!this.shouldVisitProducerOf(inputChannel)) { // Barrier. |
| // At this point, we know that we are re-optimizing because the producer was already executed and |
| // is therefore not visited. |
| final ExecutionTask producer = inputChannel.getProducer(); |
| |
| // We need to see, if we must re-use the PlatformExecution of the producer. |
| // Most important seeds are those that might need to use an existing PlatformExecution: |
| // We need to create their InterimStages immediately to reuse the PlatformExecution before |
| // ExecutionTasks are assigned to other InterimStages. |
| if (this.checkIfShouldReusePlatformExecution(producer, inputChannel, task)) { |
| this.createStageFor(task, producer.getStage().getPlatformExecution()); |
| } |
| } else { |
| // Otherwise, just stage. |
| assert inputChannel.getProducer() != null; |
| stagedTasks.add(inputChannel.getProducer()); |
| } |
| } |
| } |
| |
| // Now, we can assign all InterimStages with new PlatformExecutions. |
| Queue<ExecutionTask> seedTasks = new LinkedList<>(relevantTasks); |
| while (!seedTasks.isEmpty()) { |
| final ExecutionTask task = seedTasks.poll(); |
| this.createStageFor(task, null); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| this.assignedInterimStages.values().stream().distinct().forEach( |
| stage -> logger.debug("Established initial stage with {}.", stage.getTasks()) |
| ); |
| } |
| |
| // Make sure that we didn't blunder. |
| assert relevantTasks.stream().allMatch(this.assignedInterimStages::containsKey); |
| } |
| |
| /** |
| * Determines whether the {@code task} should reuse the {@link PlatformExecution} of the {@code producer}. |
| * |
| * @param producer whose {@link PlatformExecution} might be reused |
| * @param inputChannel connects the {@code producer} to the {@code task} |
| * @param task might reuse the {@link PlatformExecution} of {@code producer} |
| * @return whether to reuse |
| */ |
| private boolean checkIfShouldReusePlatformExecution(ExecutionTask producer, Channel inputChannel, ExecutionTask task) { |
| final Platform producerPlatform = producer.getPlatform(); |
| return producerPlatform.equals(task.getPlatform()) && |
| producerPlatform.isSinglePlatformExecutionPossible(producer, inputChannel, task); |
| } |
| |
| /** |
| * Starts building an {@link InterimStage} starting from the given {@link ExecutionTask} unless there is one already. |
| * If a {@link PlatformExecution} is provided, the {@link InterimStage} will be associated with it. |
| */ |
| private void createStageFor(ExecutionTask task, PlatformExecution platformExecution) { |
| assert task.getStage() == null : String.format("%s has already stage %s.", task, task.getStage()); |
| |
| // See if there is already an InterimStage. |
| if (this.assignedInterimStages.containsKey(task)) { |
| return; |
| } |
| |
| // Create a new PlatformExecution if none. |
| if (platformExecution == null) { |
| Platform platform = task.getOperator().getPlatform(); |
| platformExecution = new PlatformExecution(platform); |
| } |
| |
| // Create the InterimStage and expand it. |
| InterimStage initialStage = new InterimStageImpl(platformExecution); |
| this.addStage(initialStage); |
| this.assignTaskAndExpand(task, initialStage); |
| } |
| |
| /** |
| * Adds an {@link InterimStage} byt putting it on {@link #allStages} and {@link #newStages}. |
| */ |
| private void addStage(InterimStage stage) { |
| this.newStages.add(stage); |
| this.allStages.add(stage); |
| } |
| |
| /** |
| * Assign the given {@link ExecutionTask} to the given {@link InterimStage}s, then expands it to adjacent {@link ExecutionTask}s. |
| */ |
| private void assignTaskAndExpand(ExecutionTask task, InterimStage interimStage) { |
| this.assign(task, interimStage); |
| this.expandDownstream(task, interimStage); |
| this.expandUpstream(task, interimStage); |
| } |
| |
| /** |
| * Assign the given {@link ExecutionTask} to the given {@link InterimStage}s and set up {@link #requiredStages} |
| * for it. |
| */ |
| private void assign(ExecutionTask task, InterimStage newStage) { |
| assert task.getOperator().getPlatform().equals(newStage.getPlatform()); |
| newStage.addTask(task); |
| final InterimStage oldStage = this.assignedInterimStages.put(task, newStage); |
| logger.trace("Reassigned {} from {} to {}.", task, oldStage, newStage); |
| } |
| |
| /** |
| * Handle the upstream neighbors of the given {@code task} by expanding the {@code expandableStage}. |
| */ |
| private void expandDownstream(ExecutionTask task, InterimStage expandableStage) { |
| for (Channel channel : task.getOutputChannels()) { |
| assert channel != null : String.format("%s has null output channels.", task); |
| if (channel.isExecutionBreaker()) { |
| expandableStage.setOutbound(task); |
| } |
| for (ExecutionTask consumer : channel.getConsumers()) { |
| final InterimStage assignedStage = this.assignedInterimStages.get(consumer); |
| if (assignedStage == null) { |
| this.handleTaskWithoutPlatformExecution(consumer, /*channel.isExecutionBreaker() ? null : */ expandableStage); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Handle the upstream neighbors of the given {@code task} by expanding the {@code expandableStage} if possible. |
| */ |
| private void expandUpstream(ExecutionTask task, InterimStage expandableStage) { |
| for (Channel channel : task.getInputChannels()) { |
| if (!this.shouldVisitProducerOf(channel)) continue; |
| final ExecutionTask producer = channel.getProducer(); |
| assert producer != null; |
| final InterimStage assignedStage = this.assignedInterimStages.get(producer); |
| if (assignedStage == null) { |
| this.handleTaskWithoutPlatformExecution(producer, /*channel.isExecutionBreaker() ? null : */expandableStage); |
| } |
| } |
| } |
| |
| /** |
| * Handle a given {@code task} by expanding the {@code expandableStage} if possible. |
| */ |
| private void handleTaskWithoutPlatformExecution(ExecutionTask task, InterimStage expandableStage) { |
| final Platform operatorPlatform = task.getOperator().getPlatform(); |
| if (expandableStage != null && operatorPlatform.equals(expandableStage.getPlatform())) { |
| this.assignTaskAndExpand(task, expandableStage); |
| } |
| } |
| |
| /** |
| * Refine the current {@link #newStages} and put the result to {@link #allStages}. |
| */ |
| private void refineStages() { |
| // Apply the #splittingCriteria at first. |
| new ArrayList<>(this.newStages).forEach(this::applySplittingCriteria); |
| |
| this.splitStagesByPrecedence(); |
| } |
| |
| /** |
| * Applies all {@link #splittingCriteria} to the given {@code stage}, thereby refining it. |
| */ |
| private void applySplittingCriteria(InterimStage stage) { |
| // TODO: This splitting mechanism can cause unnecessary fragmentation of stages. Most likely, because "willTaskBeSeparated" depends on the traversal order of the stage DAG. |
| |
| // Keeps track of ExecutionTasks that should be separated from those that are not in this Set. |
| Set<ExecutionTask> tasksToSeparate = new HashSet<>(); |
| |
| // Maintains ExecutionTasks whose outgoing Channels have been visited. |
| Set<ExecutionTask> seenTasks = new HashSet<>(); |
| |
| // Maintains ExecutionTasks to be visited and checked for split criteria. |
| Queue<ExecutionTask> taskQueue = new LinkedList<>(stage.getStartTasks()); |
| while (!taskQueue.isEmpty()) { |
| final ExecutionTask task = taskQueue.poll(); |
| |
| // Avoid visiting the task twice. |
| if (seenTasks.add(task)) { |
| |
| // Check if the task is already marked for separation. |
| boolean willTaskBeSeparated = tasksToSeparate.contains(task); |
| |
| // Visit all successor tasks and check whether they should be separated. |
| for (Channel channel : task.getOutputChannels()) { |
| for (ExecutionTask consumerTask : channel.getConsumers()) { |
| // If the consumerTask is in other stage, there is no need to split. |
| if (this.assignedInterimStages.get(consumerTask) != stage) { |
| continue; |
| } |
| |
| if (willTaskBeSeparated || this.splittingCriteria.stream().anyMatch( |
| criterion -> criterion.shouldSplit(task, channel, consumerTask) |
| )) { |
| if (consumerTask.isFeedbackInput(channel)) { |
| // TODO: Use marks to implement same-stage splits. |
| // channel.setStageExecutionBarrier(true); |
| continue; |
| } |
| tasksToSeparate.add(consumerTask); |
| } |
| taskQueue.add(consumerTask); |
| } |
| } |
| } |
| } |
| |
| if (!tasksToSeparate.isEmpty()) { |
| assert tasksToSeparate.size() < stage.getTasks().size() : String.format( |
| "Cannot separate all tasks from stage with tasks %s.", tasksToSeparate |
| ); |
| // Prepare to split the ExecutionTasks that are not separated. |
| final HashSet<ExecutionTask> tasksToKeep = new HashSet<>(stage.getTasks()); |
| tasksToKeep.removeAll(tasksToSeparate); |
| |
| // Separate the ExecutionTasks and create stages for each connected component. |
| do { |
| Set<ExecutionTask> component = this.separateConnectedComponent(tasksToSeparate); |
| final InterimStage separatedStage = this.splitStage(stage, component); |
| this.applySplittingCriteria(separatedStage); |
| } while (!tasksToSeparate.isEmpty()); |
| |
| // Also split the remainder into connected components. |
| while (true) { |
| Set<ExecutionTask> component = this.separateConnectedComponent(tasksToKeep); |
| // Avoid "splitting" if the tasksToKeep are already a connected component. |
| if (tasksToKeep.isEmpty()) break; |
| final InterimStage separatedStage = this.splitStage(stage, component); |
| this.applySplittingCriteria(separatedStage); |
| } |
| } |
| } |
| |
| /** |
| * Removes a connected component of {@link ExecutionTask}s. |
| * |
| * @param tasks from that a connected component should be removed |
| * @return the connected component |
| */ |
| private Set<ExecutionTask> separateConnectedComponent(Set<ExecutionTask> tasks) { |
| assert !tasks.isEmpty(); |
| |
| // Prepare data structures. |
| Queue<ExecutionTask> stagedTasks = new LinkedList<>(); |
| Set<ExecutionTask> connectedComponent = new HashSet<>(tasks.size()); |
| |
| // Remove any element from the tasks. |
| final Iterator<ExecutionTask> iterator = tasks.iterator(); |
| final ExecutionTask seed = iterator.next(); |
| stagedTasks.add(seed); |
| iterator.remove(); |
| |
| // Expand the connected component. |
| ExecutionTask task; |
| while ((task = stagedTasks.poll()) != null) { |
| connectedComponent.add(task); |
| for (Channel channel : task.getInputChannels()) { |
| if (task.isFeedbackInput(channel)) continue; |
| final ExecutionTask producer = channel.getProducer(); |
| if (tasks.remove(producer)) { |
| stagedTasks.add(producer); |
| } |
| } |
| for (Channel channel : task.getOutputChannels()) { |
| for (ExecutionTask consumer : channel.getConsumers()) { |
| if (!consumer.isFeedbackInput(channel) && tasks.remove(consumer)) { |
| stagedTasks.add(consumer); |
| } |
| } |
| } |
| } |
| |
| // Return the connected component. |
| return connectedComponent; |
| } |
| |
| /** |
| * Spans a precedence graph between {@link #newStages} and splits them where necessary. |
| */ |
| private void splitStagesByPrecedence() { |
| // Assign the required stages for each ExecutionTask: Each one requires its very own stage. |
| for (InterimStage stage : this.newStages) { |
| for (ExecutionTask task : stage.getTasks()) { |
| this.requiredStages.computeIfAbsent(task, key -> new HashSet<>(4)).add(stage); |
| } |
| } |
| |
| // Update the precedence graph and split until we reach a stable state. |
| while (!this.newStages.isEmpty()) { |
| |
| // Update the precedence graph. |
| for (InterimStage currentStage : this.newStages) { |
| |
| // We start from the outbound ExecutionTasks of each stage, because within each stage we will not create new precedences. |
| for (ExecutionTask outboundTask : currentStage.getOutboundTasks()) { |
| |
| // Start with the currently required stages. |
| final HashSet<InterimStage> requiredStages = new HashSet<>(this.requiredStages.get(outboundTask)); |
| |
| // Propagate these stages to all follow-up tasks. |
| for (Channel channel : outboundTask.getOutputChannels()) { |
| for (ExecutionTask consumer : channel.getConsumers()) { |
| if (!consumer.isFeedbackInput(channel)) { |
| this.updateRequiredStages(consumer, requiredStages); |
| } |
| } |
| } |
| } |
| } |
| |
| // Partition stages. Might yield new #newStages. |
| this.newStages.clear(); |
| new ArrayList<>(this.allStages).forEach(this::partitionStage); |
| } |
| |
| } |
| |
| /** |
| * Update the {@link #requiredStages} while recursively traversing downstream over {@link ExecutionTask}s. |
| * Also, mark its {@link InterimStage} ({@link InterimStage#markDependenciesUpdated()}) |
| * to keep track of dependencies between the {@link InterimStage}s that we have been unaware of so far. |
| * |
| * @param task which is to be traversed next |
| * @param requiredStages the required {@link InterimStage}s |
| */ |
| private void updateRequiredStages(ExecutionTask task, |
| Set<InterimStage> requiredStages) { |
| // Find the InterimStage assigned to the task. |
| final InterimStage currentStage = this.assignedInterimStages.get(task); |
| |
| // Update the requiredStages by the InterimStage of the task. |
| boolean isCurrentStageAdded = requiredStages.add(currentStage); |
| |
| // Try to update the #requiredStages of our task. |
| final Set<InterimStage> currentlyRequiredStages = this.requiredStages.get(task); |
| if (currentlyRequiredStages.addAll(requiredStages)) { |
| // If there is a new required stage, mark the stage. |
| logger.debug("Updated required stages of {} to {}.", task, currentlyRequiredStages); |
| currentStage.markDependenciesUpdated(); |
| |
| // And propagate the dependencies downstream. We assume all downstream dependencies to be supersets of |
| // the current requiredStage, which should be true by construction. |
| for (Channel channel : task.getOutputChannels()) { |
| for (ExecutionTask consumingTask : channel.getConsumers()) { |
| if (!consumingTask.isFeedbackInput(channel)) { |
| this.updateRequiredStages(consumingTask, requiredStages); |
| } |
| } |
| } |
| } |
| |
| if (isCurrentStageAdded) { |
| requiredStages.remove(currentStage); |
| } |
| } |
| |
| |
| /** |
| * Partition the {@code stage} into two halves. All {@link ExecutionTask}s that do not have the minimum count of |
| * required {@link InterimStage}s will be put into a new {@link InterimStage}. |
| * |
| * @return whether a split occurred |
| */ |
| private boolean partitionStage(InterimStage stage) { |
| // Short-cut: if the stage has not been marked, its required stages did not change. |
| if (!stage.getAndResetSplitMark()) { |
| return false; |
| } |
| int minRequiredStages = -1; |
| final Collection<ExecutionTask> initialTasks = new LinkedList<>(); |
| final Set<ExecutionTask> tasksToSeparate = new HashSet<>(); |
| for (ExecutionTask task : stage.getTasks()) { |
| final Set<InterimStage> requiredStages = this.requiredStages.get(task); |
| if (minRequiredStages == -1 || requiredStages.size() < minRequiredStages) { |
| tasksToSeparate.addAll(initialTasks); |
| initialTasks.clear(); |
| minRequiredStages = requiredStages.size(); |
| } |
| (minRequiredStages == requiredStages.size() ? initialTasks : tasksToSeparate).add(task); |
| } |
| |
| if (tasksToSeparate.isEmpty()) { |
| logger.debug("No separable tasks found in marked stage {}.", stage); |
| return false; |
| } else { |
| // Prepare to split the ExecutionTasks that are not separated. |
| final HashSet<ExecutionTask> tasksToKeep = new HashSet<>(stage.getTasks()); |
| tasksToKeep.removeAll(tasksToSeparate); |
| |
| // Separate the ExecutionTasks and create stages for each connected component. |
| do { |
| Set<ExecutionTask> component = this.separateConnectedComponent(tasksToSeparate); |
| this.splitStage(stage, component); |
| } while (!tasksToSeparate.isEmpty()); |
| |
| // Also split the remainder into connected components. |
| while (true) { |
| Set<ExecutionTask> component = this.separateConnectedComponent(tasksToKeep); |
| // Avoid "splitting" if the tasksToKeep are already a connected component. |
| if (tasksToKeep.isEmpty()) break; |
| this.splitStage(stage, component); |
| } |
| return true; |
| } |
| } |
| |
| /** |
| * Splits the given {@link ExecutionTask}s from the {@link InterimStage} to form a new {@link InterimStage}, |
| * which will be added to {@link #newStages}. Also. {@link #assignedInterimStages} will be updated. |
| * |
| * @return the new {@link InterimStage} |
| */ |
| private InterimStage splitStage(InterimStage stage, Set<ExecutionTask> separableTasks) { |
| if (logger.isDebugEnabled()) { |
| Set<ExecutionTask> residualTasks = new HashSet<>(stage.getTasks()); |
| residualTasks.removeAll(separableTasks); |
| logger.debug("Separating " + separableTasks + " from " + residualTasks + "..."); |
| |
| } |
| InterimStage newStage = stage.separate(separableTasks); |
| this.addStage(newStage); |
| for (ExecutionTask separatedTask : newStage.getTasks()) { |
| this.assign(separatedTask, newStage); |
| } |
| return newStage; |
| } |
| |
| private ExecutionPlan assembleExecutionPlan() { |
| final Map<InterimStage, ExecutionStage> finalStages = new HashMap<>(this.allStages.size()); |
| for (ExecutionTask sinkTask : this.executionTaskFlow.getSinkTasks()) { |
| this.assembleExecutionPlan(finalStages, null, sinkTask, new HashSet<>()); |
| } |
| final ExecutionPlan executionPlan = new ExecutionPlan(); |
| finalStages.values().stream().filter(ExecutionStage::isStartingStage).forEach(executionPlan::addStartingStage); |
| return executionPlan; |
| } |
| |
| /** |
| * Creates {@link ExecutionStage}s and connects them. |
| * |
| * @param finalStages collects the {@link ExecutionStage}s |
| * @param successorExecutionStage the {@link ExecutionStage} following the {@code currentExecutionTask} |
| * @param currentExecutionTask an {@link ExecutionTask} whose {@link InterimStage} is to be considered |
| * @param visitedTasks maintains already visited {@link ExecutionTask}s to avoid running into loops |
| */ |
| private void assembleExecutionPlan(Map<InterimStage, ExecutionStage> finalStages, |
| ExecutionStage successorExecutionStage, |
| ExecutionTask currentExecutionTask, |
| HashSet<Object> visitedTasks) { |
| |
| // Get or create the final ExecutionStage. |
| final InterimStage interimStage = this.assignedInterimStages.get(currentExecutionTask); |
| final ExecutionStage executionStage = finalStages.computeIfAbsent(interimStage, InterimStage::toExecutionStage); |
| |
| if (successorExecutionStage != null |
| && !executionStage.equals(successorExecutionStage) |
| && !executionStage.getSuccessors().contains(successorExecutionStage)) { |
| executionStage.addSuccessor(successorExecutionStage); |
| } |
| |
| // Avoid running into loops. However, we must not do this check earlier because we might visit ExecutionTasks |
| // from several different predecessor InterimStages. |
| if (!visitedTasks.add(currentExecutionTask)) { |
| return; |
| } |
| |
| for (Channel channel : currentExecutionTask.getInputChannels()) { |
| if (this.shouldVisitProducerOf(channel)) { |
| final ExecutionTask predecessor = channel.getProducer(); |
| this.assembleExecutionPlan(finalStages, executionStage, predecessor, visitedTasks); |
| } |
| } |
| |
| } |
| |
| /** |
| * Tells whether the producer of the given {@link Channel} (and its producers in turn) should be considered. |
| */ |
| private boolean shouldVisitProducerOf(Channel channel) { |
| // We do not follow copied Channels, because they mark the border to already executed ExecutionTasks. |
| return !channel.isCopy(); |
| } |
| |
| /** |
| * Intermediate step towards {@link ExecutionStage}s. Supports some build utilites and can be split. |
| */ |
| private interface InterimStage { |
| |
| Set<ExecutionTask> getTasks(); |
| |
| Platform getPlatform(); |
| |
| void addTask(ExecutionTask task); |
| |
| void setOutbound(ExecutionTask task); |
| |
| ExecutionStage toExecutionStage(); |
| |
| InterimStage separate(Set<ExecutionTask> separableTasks); |
| |
| /** |
| * Check whether this instance is marked to have new dependencies. If so, reset the mark. |
| * |
| * @return whether this instance was marked |
| * @see #markDependenciesUpdated() |
| */ |
| boolean getAndResetSplitMark(); |
| |
| /** |
| * Mark this instance. We use it, to mark instances that have gotten new {@link #requiredStages} during |
| * splitting of stages. |
| */ |
| void markDependenciesUpdated(); |
| |
| Set<ExecutionTask> getOutboundTasks(); |
| |
| Collection<ExecutionTask> getStartTasks(); |
| } |
| |
| private class InterimStageImpl implements InterimStage { |
| |
| /** |
| * The {@link PlatformExecution} to that this instance belongs to. |
| */ |
| private final PlatformExecution platformExecution; |
| |
| /** |
| * All tasks being in this instance. |
| */ |
| private final Set<ExecutionTask> allTasks = new HashSet<>(); |
| |
| /** |
| * All tasks that feed a {@link Channel} that is consumed by a different {@link PlatformExecution}. |
| */ |
| private final Set<ExecutionTask> outboundTasks = new HashSet<>(); |
| |
| /** |
| * Use for mark-and-sweep algorithms. (Specifically: mark changed stages) |
| */ |
| private boolean isMarked; |
| |
| /** |
| * For debugging purposes only. |
| */ |
| private final int sequenceNumber; |
| |
| /** |
| * Creates a new instance. |
| */ |
| public InterimStageImpl(PlatformExecution platformExecution) { |
| this(platformExecution, 0); |
| } |
| |
| private InterimStageImpl(PlatformExecution platformExecution, int sequenceNumber) { |
| this.platformExecution = platformExecution; |
| this.sequenceNumber = sequenceNumber; |
| } |
| |
| @Override |
| public Platform getPlatform() { |
| return this.platformExecution.getPlatform(); |
| } |
| |
| @Override |
| public void addTask(ExecutionTask task) { |
| this.allTasks.add(task); |
| } |
| |
| @Override |
| public void setOutbound(ExecutionTask task) { |
| Validate.isTrue(this.allTasks.contains(task)); |
| this.outboundTasks.add(task); |
| } |
| |
| @Override |
| public Set<ExecutionTask> getOutboundTasks() { |
| return this.outboundTasks; |
| } |
| |
| @Override |
| public Collection<ExecutionTask> getStartTasks() { |
| return this.getTasks().stream().filter(this::checkIfStartTask).collect(Collectors.toList()); |
| } |
| |
| @Override |
| public Set<ExecutionTask> getTasks() { |
| return this.allTasks; |
| } |
| |
| @Override |
| public InterimStage separate(Set<ExecutionTask> separableTasks) { |
| InterimStage newStage = this.createSplit(); |
| for (Iterator<ExecutionTask> i = this.allTasks.iterator(); i.hasNext(); ) { |
| final ExecutionTask task = i.next(); |
| if (separableTasks.contains(task)) { |
| i.remove(); |
| newStage.addTask(task); |
| if (this.outboundTasks.remove(task)) { |
| newStage.setOutbound(task); |
| } |
| } |
| } |
| // Exchange Channels where necessary. |
| for (ExecutionTask task : this.allTasks) { |
| for (int outputIndex = 0; outputIndex < task.getNumOuputChannels(); outputIndex++) { |
| Channel outputChannel = task.getOutputChannels()[outputIndex]; |
| boolean isInterStageRequired = outputChannel.getConsumers().stream() |
| .anyMatch(consumer -> !this.allTasks.contains(consumer)); |
| if (!isInterStageRequired) continue; |
| this.outboundTasks.add(task); |
| // if (outputChannel.isInterStageCapable()) continue; |
| // TODO: We cannot "exchange" Channels so easily any more. |
| // if (!task.getOperator().getPlatform().getChannelManager() |
| // .exchangeWithInterstageCapable(outputChannel)) { |
| // StageAssignmentTraversal.logger.warn("Could not exchange {} with an interstage-capable channel.", |
| // outputChannel); |
| // } |
| } |
| } |
| |
| return newStage; |
| } |
| |
| public InterimStage createSplit() { |
| return new InterimStageImpl(this.platformExecution, this.sequenceNumber + 1); |
| } |
| |
| @Override |
| public void markDependenciesUpdated() { |
| this.isMarked = true; |
| } |
| |
| @Override |
| public boolean getAndResetSplitMark() { |
| final boolean value = this.isMarked; |
| this.isMarked = false; |
| return value; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("InterimStage%s", this.getStartTasks()); |
| // return String.format("InterimStage[%s:%d]", this.getPlatform().getName(), this.sequenceNumber); |
| } |
| |
| @Override |
| public ExecutionStage toExecutionStage() { |
| final Iterator<ExecutionTask> iterator = this.allTasks.iterator(); |
| final LoopSubplan loop = iterator.next().getOperator().getInnermostLoop(); |
| assert Iterators.allMatch(iterator, |
| task -> task.getOperator().getInnermostLoop() == loop, |
| true |
| ) : String.format("There are different loops in the stage with the tasks %s.", |
| this.allTasks.stream() |
| .map(task -> new Tuple<>(task, task.getOperator().getInnermostLoop())) |
| .collect(Collectors.toList()) |
| ); |
| |
| ExecutionStageLoop executionStageLoop = null; |
| if (loop != null) { |
| executionStageLoop = StageAssignmentTraversal.this.stageLoops.computeIfAbsent(loop, ExecutionStageLoop::new); |
| } |
| |
| final ExecutionStage executionStage = this.platformExecution.createStage(executionStageLoop, this.sequenceNumber); |
| for (ExecutionTask task : this.allTasks) { |
| executionStage.addTask(task); |
| if (this.checkIfStartTask(task)) { |
| executionStage.markAsStartTask(task); |
| } |
| if (this.checkIfTerminalTask(task)) { |
| executionStage.markAsTerminalTask(task); |
| } |
| } |
| assert !executionStage.getTerminalTasks().isEmpty() : |
| String.format("No terminal tasks among %s.", this.allTasks); |
| return executionStage; |
| } |
| |
| /** |
| * Checks if <i>all</i> input {@link Channel}s of the given {@code task} are outbound w.r.t. to the |
| * {@link InterimStage}. |
| */ |
| private boolean checkIfStartTask(ExecutionTask task) { |
| for (Channel channel : task.getInputChannels()) { |
| if (this.checkIfFeedbackChannel(task, channel)) continue; |
| final ExecutionTask producer = channel.getProducer(); |
| if (this.equals(StageAssignmentTraversal.this.assignedInterimStages.get(producer))) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Checks if the given {@code channel} is a feedback to {@code task} (i.e., it closes a data flow cycle). |
| */ |
| private boolean checkIfFeedbackChannel(ExecutionTask task, Channel channel) { |
| if (!task.getOperator().isLoopHead()) return false; |
| final InputSlot<?> input = task.getInputSlotFor(channel); |
| return input != null && input.isFeedback(); |
| } |
| |
| /** |
| * Checks if <i>all</i> output {@link Channel}s of the given {@code task} are outbound w.r.t. to the |
| * {@link InterimStage}. |
| */ |
| private boolean checkIfTerminalTask(ExecutionTask task) { |
| for (Channel channel : task.getOutputChannels()) { |
| if (this.checkIfFeedforwardChannel(task, channel)) continue; |
| for (ExecutionTask consumer : channel.getConsumers()) { |
| if (this.equals(StageAssignmentTraversal.this.assignedInterimStages.get(consumer))) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Checks if the given {@code channel} is a feedforward to {@code task} (i.e., it constitutes the beginning of a data flow cycle). |
| */ |
| private boolean checkIfFeedforwardChannel(ExecutionTask task, Channel channel) { |
| if (!task.getOperator().isLoopHead()) return false; |
| final OutputSlot<?> output = task.getOutputSlotFor(channel); |
| return output != null && output.isFeedforward(); |
| } |
| } |
| |
| // private static class ExecutionStageAdapter implements InterimStage { |
| // |
| // private final ExecutionStage executionStage; |
| // |
| // private boolean isMarked = false; |
| // |
| // public ExecutionStageAdapter(ExecutionStage executionStage) { |
| // this.executionStage = executionStage; |
| // } |
| // |
| // @Override |
| // public Set<ExecutionTask> getTasks() { |
| // return this.executionStage.getAllTasks(); |
| // } |
| // |
| // @Override |
| // public Platform getPlatform() { |
| // return this.executionStage.getPlatformExecution().getPlatform(); |
| // } |
| // |
| // @Override |
| // public void addTask(ExecutionTask task) { |
| // throw new RuntimeException("Unmodifiable."); |
| // } |
| // |
| // @Override |
| // public void setOutbound(ExecutionTask task) { |
| // throw new RuntimeException("Unmodifiable."); |
| // } |
| // |
| // @Override |
| // public ExecutionStage toExecutionStage() { |
| // return this.executionStage; |
| // } |
| // |
| // @Override |
| // public InterimStage separate(Set<ExecutionTask> separableTasks) { |
| // throw new RuntimeException("Unmodifiable."); |
| // } |
| // |
| // @Override |
| // public boolean getAndResetSplitMark() { |
| // boolean wasMarked = this.isMarked; |
| // this.isMarked = false; |
| // return wasMarked; |
| // } |
| // |
| // @Override |
| // public void mark() { |
| // this.isMarked = true; |
| // } |
| // |
| // @Override |
| // public Set<ExecutionTask> getOutboundTasks() { |
| // return new HashSet<>(this.executionStage.getTerminalTasks()); |
| // } |
| // } |
| // |
| |
| /** |
| * Criterion to futher split {@link InterimStage} besides precedence. |
| */ |
| @FunctionalInterface |
| public interface StageSplittingCriterion { |
| |
| /** |
| * Tells whether two adjacent {@link ExecutionTask}s of the same current {@link InterimStage} should be placed |
| * in different {@link ExecutionStage}s. |
| * |
| * @return {@code true} if the {@link ExecutionTask}s must be separated, {@code false} if it is not necessary |
| */ |
| boolean shouldSplit(ExecutionTask producerTask, Channel channel, ExecutionTask consumerTask); |
| |
| } |
| |
| } |