blob: f289e5eae43beda8f864140fb44f215cdae341e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.plan.wayangplan;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Traverse a plan. In each instance, every operator will be traversed only once.
*/
public class PlanTraversal {
private static final Logger logger = LogManager.getLogger(PlanTraversal.class);
public Set<Operator> visitedRelevantOperators = new HashSet<>(), visitedIrrelevantOperators = new HashSet<>();
private final boolean isFollowInputs;
private final boolean isFollowOutputs;
/**
* Whether to visit the {@link Operator}s contained in {@link CompositeOperator}s.
*/
private Predicate<OperatorContainer> containerEnterCondition = container -> false;
/**
* If a {@link CompositeOperator} has been entered as permitted by {@link #containerEnterCondition}, then this
* field tells whether {@link CompositeOperator}s should still be treated like {@link ElementaryOperator}s,
* e.g., w.rt. the {@link #traversalCallback}.
*/
private Predicate<CompositeOperator> compositeRelevanceCondition = compositeOperator -> false;
private Callback traversalCallback = null;
private Predicate<OutputSlot<?>> outputFollowPredicate = outputSlot -> true;
private Predicate<InputSlot<?>> inputFollowPredicate = inputSlot -> true;
private Predicate<InputSlot<?>> inputFollowDownstreamPredicate = inputSlot -> true;
@Deprecated
public PlanTraversal(boolean isFollowInputs, boolean isFollowOutputs) {
this.isFollowInputs = isFollowInputs;
this.isFollowOutputs = isFollowOutputs;
}
/**
* @return new instance that follows any reachable {@link Operator}
*/
public static PlanTraversal fanOut() {
return new PlanTraversal(true, true);
}
/**
* @return new instance traverses downstream only (i.e., follows {@link OutputSlot}s only)
*/
public static PlanTraversal downstream() {
return new PlanTraversal(false, true);
}
/**
* @return new instance traverses downstream only (i.e., follows {@link OutputSlot}s only)
*/
public static PlanTraversal upstream() {
return new PlanTraversal(true, false);
}
public PlanTraversal withCallback(Callback traversalCallback) {
this.traversalCallback = traversalCallback;
return this;
}
public PlanTraversal withCallback(SimpleCallback traversalCallback) {
this.traversalCallback = (operator, fromInputSlot, fromOutputSlot) -> traversalCallback.traverse(operator);
return this;
}
public PlanTraversal followingInputsIf(Predicate<InputSlot<?>> inputFollowPredicate) {
this.inputFollowPredicate = inputFollowPredicate;
return this;
}
public PlanTraversal followingInputsDownstreamIf(Predicate<InputSlot<?>> inputFollowPredicate) {
this.inputFollowDownstreamPredicate = inputFollowPredicate;
return this;
}
public PlanTraversal followingOutputsIf(Predicate<OutputSlot<?>> outputFollowPredicate) {
this.outputFollowPredicate = outputFollowPredicate;
return this;
}
/**
* Criterion to control when to visit the {@link Operator}s contained in {@link CompositeOperator}s.
*/
public PlanTraversal enteringContainersIf(Predicate<OperatorContainer> containerEnterCondition) {
this.containerEnterCondition = containerEnterCondition;
return this;
}
/**
* Criterion when to consider an entered {@link CompositeOperator} as normal {@link Operator} as well.
*/
public PlanTraversal consideringEnteredOperatorsIf(Predicate<CompositeOperator> compositeRelevanceCondition) {
this.compositeRelevanceCondition = compositeRelevanceCondition;
return this;
}
/**
* If this method is invoked, this instance will not treat {@link Subplan}s and {@link OperatorAlternative}s as
* normal traversed {@link Operator}s but will rather enter them and traverse their contained {@link Operator}s.
*
* @return this instance
*/
public PlanTraversal traversingHierarchically() {
return this.enteringContainersIf(operatorContainer -> true).consideringEnteredOperatorsIf(op -> false);
}
/**
* If this method is invoked, this instance will not enter {@link CompositeOperator}s and instead treat
* them just the same as {@link ElementaryOperator}s. This is the default.
*
* @return this instance
*/
public PlanTraversal traversingFlat() {
return this.enteringContainersIf(operatorContainer -> false);
}
/**
* Traversing as with {@link #traverse(Operator, InputSlot, OutputSlot)} for every operator.
*/
public PlanTraversal traverse(Collection<? extends Operator> operators) {
operators.forEach(this::traverse);
return this;
}
/**
* Traversing as with {@link #traverse(Operator, InputSlot, OutputSlot)} for every operator.
*/
public PlanTraversal traverse(Stream<? extends Operator> operators) {
operators.forEach(this::traverse);
return this;
}
/**
* Traverse the plan by following any connected operators.
*
* @param operator the start point of the traversal
* @return this instance
*/
public PlanTraversal traverse(Operator operator) {
return this.traverse(operator, null, null);
}
/**
* Traverses a plan in a focused manner.
*
* @param operator from that the traversal should be started (will not be traversed itself)
* @param focusedSlots {@link OutputSlot}s of the {@code operator} that should be followed
* @return this instance
*/
public PlanTraversal traverseFocused(Operator operator, Collection<OutputSlot<?>> focusedSlots) {
this.visitedRelevantOperators.add(operator);
assert focusedSlots.stream().allMatch(slot -> slot.getOwner() == operator);
this.followOutputs(focusedSlots.stream());
return this;
}
/**
* Traverses a plan.
*
* @param operator from that the traversal should be started
* @param fromInputSlot {@link InputSlot} of the {@code operator} that has been followed
* @param fromOutputSlot {@link OutputSlot} of the {@code operator} that has been followed
* @return this instance
*/
public PlanTraversal traverse(Operator operator, InputSlot<?> fromInputSlot, OutputSlot<?> fromOutputSlot) {
// Visit the operator.
boolean isUnseenOperator = this.visit(operator, fromInputSlot, fromOutputSlot);
// If it has not been seen yet, continue the traversal.
if (isUnseenOperator) {
if (this.isFollowInputs) this.followInputs(operator);
if (this.isFollowOutputs) this.followOutputs(operator);
}
return this;
}
/**
* Visit the given {@link Operator}: Route to contained {@link Operator}s if possible; note the existence of
* the {@link Operator} and whether it has been seen already; do the callback if requested.
*
* @param operator from that the traversal should be started
* @param fromInputSlot {@link InputSlot} of the {@code operator} that has been followed
* @param fromOutputSlot {@link OutputSlot} of the {@code operator} that has been followed
* @return whether the {@code operator} has not been seen yet
*/
private boolean visit(Operator operator, InputSlot<?> fromInputSlot, OutputSlot<?> fromOutputSlot) {
// Try to do a hierarchical traversal.
// Important: Don't add hierarchical Operators to the #visitedRelevantOperators, otherwise we might not traverse
// them completely (e.g., by not entering via all InputSlots).
boolean isOperatorEntered = false;
if (!operator.isElementary()) {
for (OperatorContainer operatorContainer : ((CompositeOperator) operator).getContainers()) {
if (this.containerEnterCondition.test(operatorContainer)) {
this.enter(operatorContainer, fromInputSlot, fromOutputSlot);
isOperatorEntered = true;
}
}
}
boolean isRelevantOperator = !isOperatorEntered || this.compositeRelevanceCondition.test((CompositeOperator) operator);
final Set<Operator> visitedOperators = isRelevantOperator ? this.visitedRelevantOperators : this.visitedIrrelevantOperators;
final boolean isUnseenOperator = visitedOperators.add(operator);
if (isUnseenOperator && isRelevantOperator && this.traversalCallback != null) {
this.traversalCallback.traverse(operator, fromInputSlot, fromOutputSlot);
}
return isUnseenOperator;
}
/**
* Try to enter the given {@link Operator} either via an {@link InputSlot} or {@link OutputSlot} or via its
* sink or source.
*
* @param operator a possibly non-elementary (composite) {@link Operator} to enter
* @param fromInput {@link InputSlot} via that the {@link Operator} should be entered
* @param fromOutput {@link OutputSlot} via that the {@link Operator} should be entered
* @return whether the {@link Operator} could be entered
*/
private boolean traverseHierarchical(Operator operator, InputSlot<?> fromInput, OutputSlot<?> fromOutput) {
if (operator.isSubplan()) {
this.enter((Subplan) operator, fromInput, fromOutput);
} else if (operator.isAlternative()) {
OperatorAlternative operatorAlternative = (OperatorAlternative) operator;
for (OperatorAlternative.Alternative alternative : operatorAlternative.getAlternatives()) {
this.enter(alternative, fromInput, fromOutput);
}
return true;
}
assert operator.isElementary() : String.format("Unknown composite operator: %s", operator);
return false;
}
/**
* Try to enter the given {@link OperatorContainer} either via an {@link InputSlot} or {@link OutputSlot} or via its
* sink or source.
*
* @param container the {@link OperatorContainer}
* @param fromInput {@link InputSlot} via that the {@link Operator} should be entered
* @param fromOutput {@link OutputSlot} via that the {@link Operator} should be entered
* @return whether the {@link Operator} could be entered
*/
private void enter(OperatorContainer container, InputSlot<?> fromInput, OutputSlot<?> fromOutput) {
if (fromInput != null) {
final Collection<InputSlot<Object>> innerInputs = container.followInput(fromInput.unchecked());
for (InputSlot<Object> innerInput : innerInputs) {
this.traverse(innerInput.getOwner(), innerInput, null);
}
} else if (fromOutput != null) {
final OutputSlot<Object> innerOutput = container.traceOutput(fromOutput.unchecked());
this.traverse(innerOutput.getOwner(), null, innerOutput);
} else if (container.isSink()) {
final Operator innerSink = container.getSink();
this.traverse(innerSink, null, null);
} else if (container.isSource()) {
final Operator innerSource = container.getSource();
this.traverse(innerSource, null, null);
} else {
logger.warn("Could not enter {} during hierarchical traversal.", container);
}
}
/**
* Override to control the traversal behavior.
*/
protected void followInputs(Operator operator) {
Arrays.stream(operator.getAllInputs())
.filter(this.inputFollowPredicate)
.map(InputSlot::getOccupant)
.filter(outputSlot -> outputSlot != null)
.forEach(outputSlot -> this.traverse(outputSlot.getOwner(), null, outputSlot));
}
/**
* Call {@link #followOutputs(Stream)} for all the {@link OutputSlot}s of the given {@code operator}.
*/
private void followOutputs(Operator operator) {
this.followOutputs(Arrays.stream(operator.getAllOutputs()));
}
/**
* Override to control the traversal behavior.
*/
protected void followOutputs(Stream<OutputSlot<?>> outputSlots) {
outputSlots
.filter(this.outputFollowPredicate)
.map(outputSlot -> ((OutputSlot<Object>) outputSlot).getOccupiedSlots())
.flatMap(Collection::stream)
.filter(inputSlot -> inputSlot != null)
.filter(this.inputFollowDownstreamPredicate)
.forEach(inputSlot -> this.traverse(inputSlot.getOwner(), inputSlot, null));
}
/**
* Retrieve all traversed operators that fulfill a predicate.
*
* @param operatorPredicate the predicate to filter desired operators
* @return previously traversed operators matching the predicated
*/
public Collection<Operator> getTraversedNodesWith(Predicate<Operator> operatorPredicate) {
return this.visitedRelevantOperators.stream().filter(operatorPredicate).collect(Collectors.toList());
}
/**
* Retrieve all traversed operators.
*
* @return previously traversed operators
*/
public Collection<Operator> getTraversedNodes() {
return this.getTraversedNodesWith(operator -> true);
}
/**
* A callback can be invoked during a plan traversal on each traversed node.
*/
@FunctionalInterface
public interface Callback {
/**
* Perform some action on a traversed operator.
*
* @param operator the operator that is being traversed
* @param fromInputSlot if the operator is being traversed via an input slot, this parameter is that slot, otherwise {@code null}
* @param fromOutputSlot if the operator is being traversed via an output slot, this parameter is that slot, otherwise {@code null}
*/
void traverse(Operator operator, InputSlot<?> fromInputSlot, OutputSlot<?> fromOutputSlot);
/**
* Does nothing.
*/
Callback NOP = (operator, fromInputSlot, fromOutputSlot) -> {
};
}
/**
* A callback can be invoked during a plan traversal on each traversed node.
*/
@FunctionalInterface
public interface SimpleCallback {
/**
* Perform some action on a traversed operator.
*
* @param operator the operator that is being traversed
*/
void traverse(Operator operator);
}
}