blob: 2961f1fbc7b378322e59d45960269d897b3c9efd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.plan.executionplan;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.core.optimizer.enumeration.ExecutionTaskFlow;
import org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal;
import org.apache.wayang.core.util.Counter;
/**
* Represents an executable, cross-platform data flow. Consists of muliple {@link PlatformExecution}s.
*/
public class ExecutionPlan {
private final Logger logger = LogManager.getLogger(this.getClass());
/**
* All {@link ExecutionStage}s without predecessors that need be executed at first.
*/
private Collection<ExecutionStage> startingStages = new LinkedList<>();
public void addStartingStage(ExecutionStage executionStage) {
this.startingStages.add(executionStage);
}
public Collection<ExecutionStage> getStartingStages() {
return this.startingStages;
}
/**
* Creates a {@link String} representation (not strictly ordered) of this instance.
*
* @return the {@link String} representation
*/
public String toExtensiveString() {
return this.toExtensiveString(false);
}
/**
* Creates a {@link String} representation of this instance.
*
* @param isStriclyOrdering whether {@link ExecutionStage}s should be listed only after <i>all</i> their predecessors
* @return the {@link String} representation
*/
public String toExtensiveString(boolean isStriclyOrdering) {
StringBuilder sb = new StringBuilder();
Counter<ExecutionStage> stageActivationCounter = new Counter<>();
Queue<ExecutionStage> activatedStages = new LinkedList<>(this.startingStages);
Set<ExecutionStage> seenStages = new HashSet<>();
while (!activatedStages.isEmpty()) {
while (!activatedStages.isEmpty()) {
final ExecutionStage stage = activatedStages.poll();
if (!seenStages.add(stage)) continue;
sb.append(">>> ").append(stage).append(":\n");
stage.getPlanAsString(sb, "> ");
sb.append("\n");
for (ExecutionStage successor : stage.getSuccessors()) {
final int count = stageActivationCounter.add(successor, 1);
if (!isStriclyOrdering || count == successor.getPredecessors().size() || successor.isLoopHead()) {
activatedStages.add(successor);
}
}
}
}
return sb.toString();
}
public List<Map> toJsonList() {
Counter<ExecutionStage> stageActivationCounter = new Counter<>();
Queue<ExecutionStage> activatedStages = new LinkedList<>(this.startingStages);
Set<ExecutionStage> seenStages = new HashSet<>();
ArrayList<Map> allStages = new ArrayList<>();
while (!activatedStages.isEmpty()) {
final ExecutionStage stage = activatedStages.poll();
if (!seenStages.add(stage)) continue;
Map stageMap = stage.toJsonMap();
// Better way to put sequence number ?
stageMap.put("sequence_number", allStages.size());
allStages.add(stageMap);
for (ExecutionStage successor : stage.getSuccessors()) {
final int count = stageActivationCounter.add(successor, 1);
if (count == successor.getPredecessors().size() || successor.isLoopHead()) {
activatedStages.add(successor);
}
}
}
return allStages;
}
/**
* Scrap {@link Channel}s and {@link ExecutionTask}s that are not within the given {@link ExecutionStage}s.
*
* @return {@link Channel}s from that consumer {@link ExecutionTask}s have been removed
*/
public Set<Channel> retain(Set<ExecutionStage> retainableStages) {
Set<Channel> openChannels = new HashSet<>();
for (ExecutionStage stage : retainableStages) {
for (Channel channel : stage.getOutboundChannels()) {
if (channel.retain(retainableStages)) {
openChannels.add(channel);
}
}
stage.retainSuccessors(retainableStages);
stage.getPlatformExecution().retain(retainableStages);
}
return openChannels;
}
/**
* Collects all {@link ExecutionStage}s in this instance.
*
* @return the {@link ExecutionStage}s
*/
public Set<ExecutionStage> getStages() {
Set<ExecutionStage> seenStages = new HashSet<>();
Queue<ExecutionStage> openStages = new LinkedList<>(this.getStartingStages());
while (!openStages.isEmpty()) {
final ExecutionStage stage = openStages.poll();
if (seenStages.add(stage)) {
openStages.addAll(stage.getSuccessors());
}
}
return seenStages;
}
/**
* Collects all {@link ExecutionTask}s of this instance.
*
* @return the {@link ExecutionTask}s
*/
public Set<ExecutionTask> collectAllTasks() {
Set<ExecutionTask> allTasks = new HashSet<>();
for (ExecutionStage stage : this.getStages()) {
allTasks.addAll(stage.getAllTasks());
}
return allTasks;
}
/**
* The given instance should build upon the open {@link Channel}s of this instance. Then, this instance will be
* expanded with the content of the given instance.
*
* @param expansion extends this instance, but they are not overlapping
*/
public void expand(ExecutionPlan expansion) {
for (Channel openChannel : expansion.getOpenInputChannels()) {
openChannel.mergeIntoOriginal();
final Channel original = openChannel.getOriginal();
final ExecutionStage producerStage = original.getProducer().getStage();
assert producerStage != null : String.format("No stage found for %s.", original.getProducer());
for (ExecutionTask consumer : original.getConsumers()) {
final ExecutionStage consumerStage = consumer.getStage();
assert consumerStage != null : String.format("No stage found for %s.", consumer);
// Equal stages possible on "partially open" Channels.
if (producerStage != consumerStage) {
producerStage.addSuccessor(consumerStage);
}
}
}
}
/**
* Collects all {@link Channel}s that are copies. We recognize them as {@link Channel}s that are open.
*/
public Collection<Channel> getOpenInputChannels() {
return this.collectAllTasks().stream()
.flatMap(task -> Arrays.stream(task.getInputChannels()))
.filter(Channel::isCopy)
.collect(Collectors.toList());
}
/**
* Implements various sanity checks. Problems are logged.
*/
public boolean isSane() {
// 1. Check if every ExecutionTask is assigned an ExecutionStage.
final Set<ExecutionTask> allTasks = this.collectAllTasks();
boolean isAllTasksAssigned = allTasks.stream().allMatch(task -> task.getStage() != null);
if (!isAllTasksAssigned) {
this.logger.error("There are tasks without stages.");
}
final Set<Channel> allChannels = allTasks.stream()
.flatMap(task -> Stream.concat(Arrays.stream(task.getInputChannels()), Arrays.stream(task.getOutputChannels())))
.collect(Collectors.toSet());
boolean isAllChannelsOriginal = allChannels.stream()
.allMatch(channel -> !channel.isCopy());
if (!isAllChannelsOriginal) {
this.logger.error("There are channels that are copies.");
}
boolean isAllSiblingsConsistent = true;
for (Channel channel : allChannels) {
for (Channel sibling : channel.getSiblings()) {
if (!allChannels.contains(sibling)) {
this.logger.error("A sibling of {}, namely {}, seems to be invalid.", channel, sibling);
isAllSiblingsConsistent = false;
}
}
}
return isAllTasksAssigned && isAllChannelsOriginal && isAllSiblingsConsistent;
}
/**
* Creates a new instance from the given {@link ExecutionTaskFlow}.
*
* @param executionTaskFlow should be converted into an {@link ExecutionPlan}
* @param stageSplittingCriterion defines where to install {@link ExecutionStage} boundaries
* @return the new instance
*/
public static ExecutionPlan createFrom(ExecutionTaskFlow executionTaskFlow,
StageAssignmentTraversal.StageSplittingCriterion stageSplittingCriterion) {
return StageAssignmentTraversal.assignStages(executionTaskFlow, stageSplittingCriterion);
}
}