blob: 38e86ddd44f2beecd98641b41a83aa3d4273c2c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.plan.wayangplan;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.logging.log4j.LogManager;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;
/**
* This is not an {@link Operator} in its own right. However, it contains a set of operators and can redirect
* to those.
*/
public interface OperatorContainer {
/**
* Provide the {@link SlotMapping} that translates between the contained {@link Operator}s and the containing
* {@link CompositeOperator} as defined by {@link #toOperator()}.
*
* @return the above explained {@link SlotMapping}
*/
SlotMapping getSlotMapping();
/**
* Enter this container. This container's {@link CompositeOperator} needs to be a source.
*
* @return the source operator within this container
*/
Operator getSource();
/**
* Set the source. This container's {@link CompositeOperator} needs to be a source itself.
*
* @param innerSource the source operator within this container
*/
void setSource(Operator innerSource);
/**
* Tells whether this instance represents a sink.
*
* @return whether this container corresponds to a source
*/
default boolean isSource() {
return this.toOperator().isSource();
}
/**
* Enter the encased plan by following an {@code inputSlot} of the encasing {@link CompositeOperator}.
*
* @param inputSlot an {@link InputSlot} of the encasing {@link CompositeOperator}
* @return the {@link InputSlot}s within the encased plan that are connected to the given {@code inputSlot}
*/
<T> Collection<InputSlot<T>> followInput(InputSlot<T> inputSlot);
/**
* @see #followInput(InputSlot)
*/
@SuppressWarnings("unchecked")
default Collection<InputSlot<?>> followInputUnchecked(InputSlot<?> inputSlot) {
return (Collection<InputSlot<?>>) (Collection) this.followInput(inputSlot);
}
/**
* Enter this container. This container's {@link CompositeOperator} needs to be a sink.
*
* @return the sink operator within this subplan
*/
Operator getSink();
/**
* Set the sink. This container's {@link CompositeOperator} needs to be a sink itself.
*
* @param innerSink the sink operator within this container
*/
void setSink(Operator innerSink);
/**
* @return whether this container corresponds to a sink
*/
default boolean isSink() {
return this.toOperator().isSink();
}
/**
* Enter the encased plan by following an {@code outputSlot} of the encasing {@link CompositeOperator}.
*
* @param outputSlot an {@link OutputSlot} of the encasing {@link CompositeOperator}
* @return the {@link OutputSlot} within the encased plan that are connected to the given {@code outputSlot}
*/
<T> OutputSlot<T> traceOutput(OutputSlot<T> outputSlot);
/**
* @return the {@link CompositeOperator} that corresponds to this instance, <i>not</i> any of the contained
* {@link Operator}s
*/
CompositeOperator toOperator();
/**
* Return the {@link InputSlot} that represents the given {@code inputSlot}. NB: This method assumes the given
* {@code inputSlot} has no occupant (see {@link InputSlot#getOccupant()}).
*
* @param inputSlot the {@link InputSlot} of a contained {@link Operator} that is to be resolved
* @return the {@link InputSlot} that represents the given {@code inputSlot} or {@code null} if there is none
*/
<T> InputSlot<T> traceInput(InputSlot<T> inputSlot);
/**
* Exit the encased plan by following an {@code outputSlot} of an encased, terminal {@link Operator}.
*
* @param outputSlot the {@link OutputSlot} to follow
* @return the {@link OutputSlot}s of the encasing {@link CompositeOperator} (see {@link #toOperator()}) that
* represent the given {@code outputSlot} or {@code null} if none
*/
<T> Collection<OutputSlot<T>> followOutput(OutputSlot<T> outputSlot);
/**
* Propagates the {@link CardinalityEstimate} of the given {@link InputSlot} to inner, mapped {@link InputSlot}s
*
* @see Operator#propagateInputCardinality(int, OptimizationContext.OperatorContext)
*/
default void propagateInputCardinality(int inputIndex,
OptimizationContext.OperatorContext operatorContext) {
final CompositeOperator compositeOperator = this.toOperator();
assert operatorContext.getOperator() == compositeOperator;
assert 0 <= inputIndex && inputIndex <= compositeOperator.getNumInputs();
final Collection<? extends InputSlot<?>> innerInputs = this.followInput(this.toOperator().getInput(inputIndex));
for (InputSlot<?> innerInput : innerInputs) {
Collection<OptimizationContext> innerOptimizationCtxs =
this.getInnerInputOptimizationContext(innerInput, operatorContext.getOptimizationContext());
for (OptimizationContext innerOptimizationCtx : innerOptimizationCtxs) {
// Identify the appropriate OperatorContext.
OptimizationContext.OperatorContext innerOperatorCtx = innerOptimizationCtx.getOperatorContext(innerInput.getOwner());
// Update the CardinalityEstimate.
final CardinalityEstimate cardinality = operatorContext.getInputCardinality(inputIndex);
innerOperatorCtx.setInputCardinality(innerInput.getIndex(), cardinality);
// Continue the propagation.
innerInput.getOwner().propagateInputCardinality(innerInput.getIndex(), innerOperatorCtx);
}
}
}
/**
* Propagates the {@link CardinalityEstimate} of the given {@link OutputSlot} to inner, mapped {@link OutputSlot}s
*
* @see Operator#propagateOutputCardinality(int, OptimizationContext.OperatorContext)
*/
default void propagateOutputCardinality(int outputIndex, OptimizationContext.OperatorContext operatorCtx) {
final CompositeOperator compositeOperator = this.toOperator();
assert operatorCtx.getOperator() == compositeOperator;
final OutputSlot<?> innerOutput = this.traceOutput(compositeOperator.getOutput(outputIndex));
if (innerOutput != null) {
if (compositeOperator.isLoopSubplan()) {
LogManager.getLogger(this.getClass()).warn(
"Will not propagate cardinality of {} back to {}.",
compositeOperator.getOutput(outputIndex),
innerOutput
);
return;
}
// Identify the appropriate OperatorContext.
OptimizationContext innerOptimizationCtx = operatorCtx.getOptimizationContext();
OptimizationContext.OperatorContext innerOperatorCtx = innerOptimizationCtx.getOperatorContext(innerOutput.getOwner());
assert innerOperatorCtx != null : String.format("No OperatorContext for %s's owner.", innerOutput);
// Update the CardinalityEstimate.
final CardinalityEstimate cardinality = operatorCtx.getOutputCardinality(outputIndex);
innerOperatorCtx.setOutputCardinality(innerOutput.getIndex(), cardinality);
// Continue the propagation.
innerOutput.getOwner().propagateOutputCardinality(innerOutput.getIndex(), innerOperatorCtx);
}
}
/**
* Retrieve those {@link OptimizationContext}s that represent the state when this instance is entered.
*
* @param innerInput the inner {@link InputSlot} whose {@link OptimizationContext}s are requested
* @param outerOptimizationContext the {@link OptimizationContext} in that this instance resides
* @return the inner {@link OptimizationContext}s
*/
default Collection<OptimizationContext> getInnerInputOptimizationContext(
InputSlot<?> innerInput,
OptimizationContext outerOptimizationContext) {
// Usually the same.
return Collections.singleton(outerOptimizationContext);
}
/**
* Retrieve that {@link OptimizationContext} that represents the state when this instance is exited.
*
* @param outerOptimizationContext the {@link OptimizationContext} in that this instance resides
* @return the inner {@link OptimizationContext}
*/
default OptimizationContext getInnerOutputOptimizationContext(OptimizationContext outerOptimizationContext) {
// Usually the same.
return outerOptimizationContext;
}
/**
* Acknowledge that the given old {@link Operator} has been replaced with an {@link OperatorContainer}.
*
* @param operator that has been replaced
* @param newContainer with that the {@code operator} has been replaced
*/
default void noteReplaced(Operator operator, OperatorContainer newContainer) {
final CompositeOperator newOperator = newContainer.toOperator();
final SlotMapping slotMapping = this.getSlotMapping();
slotMapping.replaceInputSlotMappings(operator, newOperator);
slotMapping.replaceOutputSlotMappings(operator, newOperator);
this.toOperator().noteReplaced(operator, newOperator);
}
/**
* Retrieves all {@link Operator}s that are <i>immediately</i> encased by this instance.
*
* @return the encased {@link Operator}s
*/
default Collection<Operator> getContainedOperators() {
Operator seed = this.isSink() ?
this.getSink() :
this.traceOutput(this.toOperator().getOutput(0)).getOwner();
return PlanTraversal.fanOut().traverse(seed).getTraversedNodes();
}
/**
* Retrieves the single(!) {@link Operator} that is <i>immediately</i> encased by this instance.
*
* @return the encased {@link Operator}s
*/
default Operator getContainedOperator() {
return WayangCollections.getSingleOrNull(this.getContainedOperators());
}
/**
* Traverses the encased subplan.
*/
default void traverse(PlanTraversal.Callback callback) {
Operator seed = this.isSink() ?
this.getSink() :
this.traceOutput(this.toOperator().getOutput(0)).getOwner();
PlanTraversal.fanOut().withCallback(callback).traverse(seed);
}
/**
* Get those {@link InputSlot}s within this instance, that are mapped via the {@link SlotMapping}.
*
* @return the {@link InputSlot}s
*/
default Collection<InputSlot<?>> getMappedInputs() {
return this.getSlotMapping().getUpstreamMapping().keySet().stream()
.filter(Slot::isInputSlot)
.map(slot -> (InputSlot<?>) slot)
.collect(Collectors.toList());
}
/**
* Get those {@link OutputSlot}s within this instance, that are mapped via the {@link SlotMapping}.
*
* @return the {@link OutputSlot}s
*/
default Collection<OutputSlot<?>> getMappedOutputs() {
return this.getSlotMapping().getUpstreamMapping().values().stream()
.filter(Slot::isOutputSlot)
.map(slot -> (OutputSlot<?>) slot)
.collect(Collectors.toList());
}
}