blob: e00753c374a0a9561c10890e67689f905f18bcd1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.optimizer.enumeration;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.channels.ChannelConversionGraph;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.OperatorAlternative;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.platform.Junction;
import org.apache.wayang.core.util.MultiMap;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.core.util.Tuple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Represents a collection of {@link PlanImplementation}s that all implement the same section of a {@link WayangPlan} (which
* is assumed to contain {@link OperatorAlternative}s in general).
* <p>Instances can be mutated and combined in algebraic manner. In particular, instances can be unioned if they implement
* the same part of the {@link WayangPlan}, concatenated if there are contact points, and pruned.</p>
*/
public class PlanEnumeration {
private static final Logger logger = LogManager.getLogger(PlanEnumeration.class);
/**
* The {@link OperatorAlternative}s for that an {@link OperatorAlternative.Alternative} has been picked.
*/
final Set<OperatorAlternative> scope;
/**
* {@link InputSlot}s that are not satisfied in this instance.
*/
final Set<InputSlot<?>> requestedInputSlots;
/**
* Combinations of {@link OutputSlot}s and {@link InputSlot}, where the former is served by this instance and the
* latter is not yet assigned in this instance. If there is no such {@link InputSlot} (because we are enumerating
* an {@link OperatorAlternative.Alternative}, then we put {@code null} instead of it.
*/
final Set<Tuple<OutputSlot<?>, InputSlot<?>>> servingOutputSlots;
/**
* {@link PlanImplementation}s contained in this instance.
*/
final Collection<PlanImplementation> planImplementations;
/**
* {@link ExecutionTask}s that have already been executed.
*/
final Map<ExecutionOperator, ExecutionTask> executedTasks;
/**
* Creates a new instance.
*/
public PlanEnumeration() {
this(new HashSet<>(), new HashSet<>(), new HashSet<>());
}
/**
* Creates a new instance.
*/
private PlanEnumeration(Set<OperatorAlternative> scope,
Set<InputSlot<?>> requestedInputSlots,
Set<Tuple<OutputSlot<?>, InputSlot<?>>> servingOutputSlots) {
this(scope, requestedInputSlots, servingOutputSlots, new LinkedList<>(), new HashMap<>());
}
/**
* Creates a new instance.
*/
private PlanEnumeration(Set<OperatorAlternative> scope,
Set<InputSlot<?>> requestedInputSlots,
Set<Tuple<OutputSlot<?>, InputSlot<?>>> servingOutputSlots,
Collection<PlanImplementation> planImplementations,
Map<ExecutionOperator, ExecutionTask> executedTasks) {
this.scope = scope;
this.requestedInputSlots = requestedInputSlots;
this.servingOutputSlots = servingOutputSlots;
this.planImplementations = planImplementations;
this.executedTasks = executedTasks;
}
/**
* Create an instance for a single {@link ExecutionOperator}.
*
* @param operator the mentioned {@link ExecutionOperator}
* @return the new instance
*/
static PlanEnumeration createSingleton(ExecutionOperator operator, OptimizationContext optimizationContext) {
final PlanEnumeration enumeration = createFor(operator, operator);
final PlanImplementation singletonPlanImplementation = enumeration.createSingletonPartialPlan(operator, optimizationContext);
enumeration.add(singletonPlanImplementation);
return enumeration;
}
/**
* Creates a new instance.
*
* @param inputOperator provides the requested {@link InputSlot}s
* @param outputOperator provides the requested {@link OutputSlot}s
* @return the new instance
*/
static PlanEnumeration createFor(Operator inputOperator, Operator outputOperator) {
return createFor(inputOperator, input -> true, outputOperator, output -> true);
}
/**
* Creates a new instance.
*
* @param inputOperator provides the requested {@link InputSlot}s
* @param inputSlotPredicate can narrow down the {@link InputSlot}s
* @param outputOperator provides the requested {@link OutputSlot}s
* @param outputSlotPredicate can narrow down the {@link OutputSlot}s
* @return the new instance
*/
static PlanEnumeration createFor(Operator inputOperator,
Predicate<InputSlot<?>> inputSlotPredicate,
Operator outputOperator,
Predicate<OutputSlot<?>> outputSlotPredicate) {
final PlanEnumeration instance = new PlanEnumeration();
for (InputSlot<?> inputSlot : inputOperator.getAllInputs()) {
if (inputSlotPredicate.test(inputSlot)) {
instance.requestedInputSlots.add(inputSlot);
}
}
for (OutputSlot outputSlot : outputOperator.getAllOutputs()) {
if (outputSlotPredicate.test(outputSlot)) {
List<InputSlot> inputSlots = outputSlot.getOccupiedSlots();
if (inputSlots.isEmpty()) {
inputSlots = Collections.singletonList(null); // InputSlot is probably in a surrounding plan.
}
for (InputSlot inputSlot : inputSlots) {
instance.servingOutputSlots.add(new Tuple<>(outputSlot, inputSlot));
}
}
}
return instance;
}
/**
* Asserts that two given instances enumerate the same part of a {@link WayangPlan}.
*/
private static void assertMatchingInterface(PlanEnumeration instance1, PlanEnumeration instance2) {
if (!instance1.requestedInputSlots.equals(instance2.requestedInputSlots)) {
throw new IllegalArgumentException("Input slots are not matching.");
}
if (!instance1.servingOutputSlots.equals(instance2.servingOutputSlots)) {
throw new IllegalArgumentException("Output slots are not matching.");
}
}
/**
* Concatenates the {@code baseEnumeration} via its {@code openOutputSlot} to the {@code targetEnumerations}.
* All {@link PlanEnumeration}s should be distinct.
*/
public PlanEnumeration concatenate(OutputSlot<?> openOutputSlot,
Collection<Channel> openChannels,
Map<InputSlot<?>, PlanEnumeration> targetEnumerations,
OptimizationContext optimizationContext,
TimeMeasurement enumerationMeasurement) {
// Check the parameters' validity.
assert this.getServingOutputSlots().stream()
.map(Tuple::getField0)
.anyMatch(openOutputSlot::equals)
: String.format("Cannot concatenate %s: it is not a served output.", openOutputSlot);
assert !targetEnumerations.isEmpty();
final TimeMeasurement concatenationMeasurement = enumerationMeasurement == null ?
null :
enumerationMeasurement.start("Concatenation");
if (logger.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Concatenating ").append(this.getPlanImplementations().size());
for (PlanEnumeration targetEnumeration : targetEnumerations.values()) {
sb.append("x").append(targetEnumeration.getPlanImplementations().size());
}
sb.append(" plan implementations.");
logger.debug(sb.toString());
}
// Prepare the result instance from this instance.
PlanEnumeration result = new PlanEnumeration();
result.scope.addAll(this.getScope());
result.requestedInputSlots.addAll(this.getRequestedInputSlots());
result.servingOutputSlots.addAll(this.getServingOutputSlots());
result.executedTasks.putAll(this.getExecutedTasks());
// Update the result instance from the target instances.
for (Map.Entry<InputSlot<?>, PlanEnumeration> entry : targetEnumerations.entrySet()) {
final InputSlot<?> openInputSlot = entry.getKey();
final PlanEnumeration targetEnumeration = entry.getValue();
result.scope.addAll(targetEnumeration.getScope());
result.requestedInputSlots.addAll(targetEnumeration.getRequestedInputSlots());
result.servingOutputSlots.addAll(targetEnumeration.getServingOutputSlots());
result.executedTasks.putAll(targetEnumeration.getExecutedTasks());
}
// NB: We need to store remove the InputSlots only here, because a single targetEnumeration
// might service multiple InputSlots. If this targetEnumeration is then also the baseEnumeration, it might
// re-request already serviced InputSlots, although already deleted.
result.requestedInputSlots.removeAll(targetEnumerations.keySet());
result.servingOutputSlots.removeIf(slotService -> slotService.getField0().equals(openOutputSlot));
// Create the PlanImplementations.
result.planImplementations.addAll(this.concatenatePartialPlans(
openOutputSlot,
openChannels,
targetEnumerations,
optimizationContext,
result,
concatenationMeasurement
));
logger.debug("Created {} plan implementations.", result.getPlanImplementations().size());
if (concatenationMeasurement != null) concatenationMeasurement.stop();
return result;
}
/**
* Concatenates all {@link PlanImplementation}s of the {@code baseEnumeration} via its {@code openOutputSlot}
* to the {@code targetEnumerations}' {@link PlanImplementation}s.
* All {@link PlanEnumeration}s should be distinct.
*/
private Collection<PlanImplementation> concatenatePartialPlans(OutputSlot<?> openOutputSlot,
Collection<Channel> openChannels,
Map<InputSlot<?>, PlanEnumeration> targetEnumerations,
OptimizationContext optimizationContext,
PlanEnumeration concatenationEnumeration,
TimeMeasurement concatenationMeasurement) {
final Job job = optimizationContext.getJob();
final OptimizationContext.OperatorContext operatorContext = optimizationContext.getOperatorContext(openOutputSlot.getOwner());
boolean isRequestBreakpoint = job.isRequestBreakpointFor(openOutputSlot, operatorContext);
return this.concatenatePartialPlansBatchwise(
openOutputSlot,
openChannels,
targetEnumerations,
optimizationContext,
isRequestBreakpoint,
concatenationEnumeration,
concatenationMeasurement
);
}
/**
* Concatenates {@link PlanEnumeration}s by batchwise processing of {@link PlanImplementation}s. All {@link PlanImplementation}s
* that share a certain implementation of the {@code openOutputSlot} or its fed {@link InputSlot}s are grouped
* into combinations so that we avoid to seek redundant {@link Junction}s.
*
* @param openOutputSlot of this instance to be concatenated
* @param targetEnumerations whose {@link InputSlot}s should be concatenated with the {@code openOutputSlot}
* @param optimizationContext provides concatenation information
* @param concatenationEnumeration to which the {@link PlanImplementation}s should be added
* @param concatenationMeasurement
* @param isRequestBreakpoint whether a breakpoint-capable {@link Channel} should be inserted
* @return the concatenated {@link PlanImplementation}s
*/
private Collection<PlanImplementation> concatenatePartialPlansBatchwise(
OutputSlot<?> openOutputSlot,
Collection<Channel> openChannels,
Map<InputSlot<?>, PlanEnumeration> targetEnumerations,
OptimizationContext optimizationContext,
boolean isRequestBreakpoint,
PlanEnumeration concatenationEnumeration,
TimeMeasurement concatenationMeasurement) {
// Preparatory initializations.
final ChannelConversionGraph channelConversionGraph = optimizationContext.getChannelConversionGraph();
// Allocate result collector.
Collection<PlanImplementation> result = new LinkedList<>();
// Bring the InputSlots to fixed order.
List<InputSlot<?>> inputs = new ArrayList<>(targetEnumerations.keySet());
// Identify identical PlanEnumerations among the targetEnumerations and baseEnumeration.
MultiMap<PlanEnumeration, InputSlot<?>> targetEnumerationGroups = new MultiMap<>();
for (Map.Entry<InputSlot<?>, PlanEnumeration> entry : targetEnumerations.entrySet()) {
targetEnumerationGroups.putSingle(entry.getValue(), entry.getKey());
}
// Group the PlanImplementations within each enumeration group.
MultiMap<PlanEnumeration, PlanImplementation.ConcatenationGroupDescriptor> enum2concatGroup = new MultiMap<>();
MultiMap<PlanImplementation.ConcatenationGroupDescriptor, PlanImplementation.ConcatenationDescriptor>
concatGroup2concatDescriptor = new MultiMap<>();
for (Map.Entry<PlanEnumeration, Set<InputSlot<?>>> entry : targetEnumerationGroups.entrySet()) {
PlanEnumeration planEnumeration = entry.getKey();
OutputSlot<?> groupOutput = planEnumeration == this ? openOutputSlot : null;
Set<InputSlot<?>> groupInputSet = entry.getValue();
List<InputSlot<?>> groupInputs = new ArrayList<>(inputs.size());
for (InputSlot<?> input : inputs) {
groupInputs.add(groupInputSet.contains(input) ? input : null);
}
for (PlanImplementation planImplementation : planEnumeration.getPlanImplementations()) {
PlanImplementation.ConcatenationDescriptor concatDescriptor =
planImplementation.createConcatenationDescriptor(groupOutput, groupInputs);
concatGroup2concatDescriptor.putSingle(concatDescriptor.groupDescriptor, concatDescriptor);
enum2concatGroup.putSingle(planImplementation.getPlanEnumeration(), concatDescriptor.groupDescriptor);
}
}
// Handle cases where this instance is not a target enumeration.
if (!targetEnumerationGroups.containsKey(this)) {
List<InputSlot<?>> emptyGroupInputs = WayangCollections.createNullFilledArrayList(inputs.size());
for (PlanImplementation planImplementation : this.getPlanImplementations()) {
PlanImplementation.ConcatenationDescriptor concatDescriptor =
planImplementation.createConcatenationDescriptor(openOutputSlot, emptyGroupInputs);
concatGroup2concatDescriptor.putSingle(concatDescriptor.groupDescriptor, concatDescriptor);
enum2concatGroup.putSingle(planImplementation.getPlanEnumeration(), concatDescriptor.groupDescriptor);
}
}
if (logger.isInfoEnabled()) {
logger.info("Concatenating {}={} concatenation groups ({} -> {} inputs).",
enum2concatGroup.values().stream().map(groups -> String.valueOf(groups.size())).collect(Collectors.joining("*")),
enum2concatGroup.values().stream().mapToInt(Set::size).reduce(1, (a, b) -> a * b),
openOutputSlot,
targetEnumerations.size()
);
}
// Enumerate all combinations of the PlanEnumerations.
List<PlanEnumeration> orderedEnumerations = new ArrayList<>(enum2concatGroup.keySet());
orderedEnumerations.remove(this);
orderedEnumerations.add(0, this); // Make sure that the base enumeration is in the beginning.
List<Set<PlanImplementation.ConcatenationGroupDescriptor>> orderedConcatGroups = new ArrayList<>(orderedEnumerations.size());
for (PlanEnumeration enumeration : orderedEnumerations) {
orderedConcatGroups.add(enum2concatGroup.get(enumeration));
}
for (List<PlanImplementation.ConcatenationGroupDescriptor> concatGroupCombo : WayangCollections.streamedCrossProduct(orderedConcatGroups)) {
// Determine the execution output along with its OptimizationContext.
PlanImplementation.ConcatenationGroupDescriptor baseConcatGroup = concatGroupCombo.get(0);
final OutputSlot<?> execOutput = baseConcatGroup.execOutput;
Set<PlanImplementation.ConcatenationDescriptor> baseConcatDescriptors = concatGroup2concatDescriptor.get(baseConcatGroup);
final PlanImplementation innerPlanImplementation = WayangCollections.getAny(baseConcatDescriptors).execOutputPlanImplementation;
// The output should reside in the same OptimizationContext in all PlanImplementations.
assert baseConcatDescriptors.stream()
.map(cd -> cd.execOutputPlanImplementation)
.map(PlanImplementation::getOptimizationContext)
.collect(Collectors.toSet()).size() == 1;
// Determine the execution OutputSlots.
List<InputSlot<?>> execInputs = new ArrayList<>(inputs.size());
for (PlanImplementation.ConcatenationGroupDescriptor concatGroup : concatGroupCombo) {
for (Set<InputSlot<?>> execInputSet : concatGroup.execInputs) {
if (execInputSet != null) execInputs.addAll(execInputSet);
}
}
// Construct a Junction between the ExecutionOperators.
final Operator outputOperator = execOutput.getOwner();
assert outputOperator.isExecutionOperator() : String.format("Expected execution operator, found %s.", outputOperator);
TimeMeasurement channelConversionMeasurement = concatenationMeasurement == null ?
null : concatenationMeasurement.start("Channel Conversion");
final Junction junction = openChannels == null || openChannels.isEmpty() ?
channelConversionGraph.findMinimumCostJunction(
execOutput,
execInputs,
innerPlanImplementation.getOptimizationContext(),
isRequestBreakpoint
) :
channelConversionGraph.findMinimumCostJunction(
execOutput,
openChannels,
execInputs,
innerPlanImplementation.getOptimizationContext());
if (channelConversionMeasurement != null) channelConversionMeasurement.stop();
if (junction == null) continue;
// If we found a junction, then we can enumerate all PlanImplementation combinations.
final List<Set<PlanImplementation>> groupPlans = WayangCollections.map(
concatGroupCombo,
concatGroup -> {
Set<PlanImplementation.ConcatenationDescriptor> concatDescriptors = concatGroup2concatDescriptor.get(concatGroup);
Set<PlanImplementation> planImplementations = new HashSet<>(concatDescriptors.size());
for (PlanImplementation.ConcatenationDescriptor concatDescriptor : concatDescriptors) {
planImplementations.add(concatDescriptor.getPlanImplementation());
}
return planImplementations;
});
for (List<PlanImplementation> planCombo : WayangCollections.streamedCrossProduct(groupPlans)) {
PlanImplementation basePlan = planCombo.get(0);
List<PlanImplementation> targetPlans = planCombo.subList(0, planCombo.size());
PlanImplementation concatenatedPlan = basePlan.concatenate(targetPlans, junction, basePlan, concatenationEnumeration);
if (concatenatedPlan != null) {
result.add(concatenatedPlan);
}
}
}
return result;
}
/**
* Groups all {@link #planImplementations} by their {@link ExecutionOperator}s' {@link OutputSlot}s for the
* {@code output}. Additionally preserves the very (nested) {@link PlanImplementation} in that {@code output} resides.
*
* @param output a (possibly top-level) {@link OutputSlot} that should be connected
* @return a mapping that represents each element {@link #planImplementations} by a key value pair
* {@code (implementing OutputSlots -> (PlanImplementation, nested PlanImplementation)}
*/
private MultiMap<OutputSlot<?>, Tuple<PlanImplementation, PlanImplementation>>
groupImplementationsByOutput(OutputSlot<?> output) {
// Sort the PlanEnumerations by their respective open InputSlot or OutputSlot.
final MultiMap<OutputSlot<?>, Tuple<PlanImplementation, PlanImplementation>> basePlanGroups =
new MultiMap<>();
// Find and validate implementing OutputSlots.
for (PlanImplementation basePlanImplementation : this.getPlanImplementations()) {
final Collection<Tuple<OutputSlot<?>, PlanImplementation>> execOpOutputsWithContext =
basePlanImplementation.findExecutionOperatorOutputWithContext(output);
final Tuple<OutputSlot<?>, PlanImplementation> execOpOutputWithCtx =
WayangCollections.getSingleOrNull(execOpOutputsWithContext);
assert execOpOutputsWithContext != null && !execOpOutputsWithContext.isEmpty()
: String.format("No outputs found for %s.", output);
basePlanGroups.putSingle(
execOpOutputWithCtx.getField0(),
new Tuple<>(basePlanImplementation, execOpOutputWithCtx.getField1())
);
}
return basePlanGroups;
}
/**
* For each given instance, group the {@link #planImplementations} by their {@link ExecutionOperator} for the
* associated {@link InputSlot}.
*
* @param enumerations a mapping from {@link InputSlot}s to {@link PlanEnumeration}s that request this input
* @return a {@link List} with an element for each {@code enumerations} entry; each entry groups the
* {@link PlanImplementation}s of the {@link PlanEnumeration} that share the same {@link ExecutionOperator}s for
* the requested {@link InputSlot}
*/
private static List<MultiMap<Set<InputSlot<?>>, PlanImplementation>> groupImplementationsByInput(
Map<InputSlot<?>, PlanEnumeration> enumerations) {
// Prepare a collector for the results.
List<MultiMap<Set<InputSlot<?>>, PlanImplementation>> targetPlanGroupList = new ArrayList<>(enumerations.size());
// Go over all PlanEnumerations.
for (Map.Entry<InputSlot<?>, PlanEnumeration> entry : enumerations.entrySet()) {
// Extract the requested InputSlot and the associated PlanEnumeration requesting it.
final InputSlot<?> requestedInput = entry.getKey();
final PlanEnumeration targetEnumeration = entry.getValue();
MultiMap<Set<InputSlot<?>>, PlanImplementation> targetPlanGroups = new MultiMap<>();
for (PlanImplementation planImpl : targetEnumeration.getPlanImplementations()) {
final Collection<InputSlot<?>> openInput = planImpl.findExecutionOperatorInputs(requestedInput);
targetPlanGroups.putSingle(WayangCollections.asSet(openInput), planImpl);
}
targetPlanGroupList.add(targetPlanGroups);
}
return targetPlanGroupList;
}
/**
* Add a {@link PlanImplementation} to this instance.
*
* @param planImplementation to be added
*/
public void add(PlanImplementation planImplementation) {
// TODO: Check if the plan conforms to this instance.
this.planImplementations.add(planImplementation);
assert planImplementation.getTimeEstimate() != null;
planImplementation.setPlanEnumeration(this);
}
/**
* Creates a new instance for exactly one {@link ExecutionOperator}.
*
* @param executionOperator will be wrapped in the new instance
* @param optimizationContext
* @return the new instance
*/
private PlanImplementation createSingletonPartialPlan(ExecutionOperator executionOperator, OptimizationContext optimizationContext) {
return new PlanImplementation(
this,
new HashMap<>(0),
Collections.singletonList(executionOperator),
optimizationContext
);
}
/**
* Unions the {@link PlanImplementation}s of this and {@code that} instance. The operation is in-place, i.e., this instance
* is modified to form the result.
*
* @param that the instance to compute the union with
*/
public void unionInPlace(PlanEnumeration that) {
assertMatchingInterface(this, that);
this.scope.addAll(that.scope);
that.planImplementations.forEach(partialPlan -> {
this.planImplementations.add(partialPlan);
partialPlan.setPlanEnumeration(this);
});
that.planImplementations.clear();
}
/**
* Create a new instance that equals this instance but redirects via
* {@link OperatorAlternative.Alternative#getSlotMapping()}.
*
* @param alternative the alternative to escape or {@code null} if none (in that case, this method returns the
* this instance)
*/
public PlanEnumeration escape(OperatorAlternative.Alternative alternative) {
if (alternative == null) return this;
PlanEnumeration escapedInstance = new PlanEnumeration();
final OperatorAlternative operatorAlternative = alternative.getOperatorAlternative();
// Copy and widen the scope.
escapedInstance.scope.addAll(this.scope);
escapedInstance.scope.add(operatorAlternative);
// Escape the input slots.
for (InputSlot inputSlot : this.requestedInputSlots) {
final InputSlot escapedInput = alternative.getSlotMapping().resolveUpstream(inputSlot);
if (escapedInput != null) {
escapedInstance.requestedInputSlots.add(escapedInput);
}
}
// Escape the output slots.
for (Tuple<OutputSlot<?>, InputSlot<?>> link : this.servingOutputSlots) {
if (link.field1 != null) {
throw new IllegalStateException("Cannot escape a connected output slot.");
}
final Collection<OutputSlot<Object>> resolvedOutputSlots =
alternative.getSlotMapping().resolveDownstream(link.field0.unchecked());
for (OutputSlot escapedOutput : resolvedOutputSlots) {
final List<InputSlot<?>> occupiedInputs = escapedOutput.getOccupiedSlots();
if (occupiedInputs.isEmpty()) {
escapedInstance.servingOutputSlots.add(new Tuple<>(escapedOutput, null));
} else {
for (InputSlot inputSlot : occupiedInputs) {
escapedInstance.servingOutputSlots.add(new Tuple<>(escapedOutput, inputSlot));
}
}
}
}
// Escape the PlanImplementation instances.
for (PlanImplementation planImplementation : this.planImplementations) {
escapedInstance.planImplementations.add(planImplementation.escape(alternative, escapedInstance));
}
return escapedInstance;
}
public Collection<PlanImplementation> getPlanImplementations() {
return this.planImplementations;
}
public Set<InputSlot<?>> getRequestedInputSlots() {
return this.requestedInputSlots;
}
public Set<Tuple<OutputSlot<?>, InputSlot<?>>> getServingOutputSlots() {
return this.servingOutputSlots;
}
public Set<OperatorAlternative> getScope() {
return this.scope;
}
public Map<ExecutionOperator, ExecutionTask> getExecutedTasks() {
return this.executedTasks;
}
@Override
public String toString() {
return this.toIOString();
}
@SuppressWarnings("unused")
private String toIOString() {
return String.format("%s[%dx, inputs=%s, outputs=%s]", this.getClass().getSimpleName(),
this.getPlanImplementations().size(),
this.requestedInputSlots, this.servingOutputSlots.stream()
.map(Tuple::getField0)
.distinct()
.collect(Collectors.toList())
);
}
@SuppressWarnings("unused")
private String toScopeString() {
return String.format("%s[%dx %s]", this.getClass().getSimpleName(),
this.getPlanImplementations().size(),
this.scope
);
}
}