| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.core.plan.wayangplan; |
| |
| import org.apache.commons.lang3.Validate; |
| import org.apache.wayang.core.api.Configuration; |
| import org.apache.wayang.core.optimizer.OptimizationContext; |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate; |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityPusher; |
| import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityPusher; |
| import org.apache.wayang.core.platform.Platform; |
| |
| import java.lang.reflect.Field; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.Objects; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| /** |
| * An operator is any node that within a {@link WayangPlan}. |
| * <p>An operator is basically determined by |
| * <ul> |
| * <li>its type,</li> |
| * <li>its configuration,</li> |
| * <li>its {@link InputSlot}s, and</li> |
| * <li>its {@link OutputSlot}s.</li> |
| * </ul> |
| * The former two aspects are handled by subclassed, the latter two are basic features of every operator. |
| * <p>{@link Slot}s are typed input and output declarations of each operator and can be connected to each other |
| * to form a full {@link WayangPlan}. Moreover, we distinguish between two kinds of {@link InputSlot}s: |
| * <ol> |
| * <li><b>Regular.</b>Each operator will have set up these {@link InputSlot}s already during its creation. |
| * They are indexed from 0 to the number of {@link InputSlot}s - 1.</li> |
| * <li><b>Broadcast.</b>Some operators permit for broadcast {@link InputSlot}s. These are dynamically added and |
| * will be indexed after the regular ones. Also, their execution semantics differ: Broadcast input data will be |
| * provided <i>before</i> the regular data.</li> |
| * </ol> |
| */ |
| public interface Operator { |
| |
| /** |
| * @return the number of {@link InputSlot}s of this instance; inclusive of broadcast {@link InputSlot}s |
| */ |
| default int getNumInputs() { |
| return this.getAllInputs().length; |
| } |
| |
| /** |
| * @return the number of non-broadcast {@link InputSlot}s of this instance |
| */ |
| default int getNumRegularInputs() { |
| return this.getNumInputs() - this.getNumBroadcastInputs(); |
| } |
| |
| /** |
| * @return the number of broadcast {@link InputSlot}s of this instance |
| */ |
| default int getNumBroadcastInputs() { |
| return (int) Arrays.stream(this.getAllInputs()) |
| .filter(Objects::nonNull) |
| .filter(InputSlot::isBroadcast) |
| .count(); |
| } |
| |
| /** |
| * @return the number of {@link OutputSlot}s of this instance |
| */ |
| default int getNumOutputs() { |
| return this.getAllOutputs().length; |
| } |
| |
| /** |
| * @return the {@link InputSlot}s of this instance; inclusive of broadcast {@link InputSlot}s |
| */ |
| InputSlot<?>[] getAllInputs(); |
| |
| /** |
| * @return the {@link OutputSlot}s of this instance |
| */ |
| OutputSlot<?>[] getAllOutputs(); |
| |
| /** |
| * Sets the {@link InputSlot} of this instance. This method must only be invoked, when the input index is not |
| * yet filled. |
| * |
| * @param index at which the {@link InputSlot} should be placed |
| * @param input the new {@link InputSlot} |
| */ |
| default void setInput(int index, InputSlot<?> input) { |
| assert index < this.getNumRegularInputs() && this.getInput(index) == null; |
| assert input.getOwner() == this; |
| ((InputSlot[]) this.getAllInputs())[index] = input; |
| } |
| |
| /** |
| * Sets the {@link OutputSlot} of this instance. This method must only be invoked, when the output index is not |
| * yet filled. |
| * |
| * @param index at which the {@link OutputSlot} should be placed |
| * @param output the new {@link OutputSlot} |
| */ |
| default void setOutput(int index, OutputSlot<?> output) { |
| assert index < this.getNumOutputs() && this.getOutput(index) == null; |
| assert output.getOwner() == this; |
| ((OutputSlot[]) this.getAllOutputs())[index] = output; |
| } |
| |
| /** |
| * Retrieve an {@link InputSlot} of this instance using its index. |
| * |
| * @param index of the {@link InputSlot} |
| * @return the requested {@link InputSlot} |
| */ |
| default InputSlot<?> getInput(int index) { |
| final InputSlot[] allInputs = this.getAllInputs(); |
| Validate.inclusiveBetween(0, allInputs.length - 1, index, "Illegal input index %d for %s.", index, this); |
| return allInputs[index]; |
| } |
| |
| /** |
| * Retrieve an {@link OutputSlot} of this instance using its index. |
| * |
| * @param index of the {@link OutputSlot} |
| * @return the requested {@link OutputSlot} |
| */ |
| default OutputSlot<?> getOutput(int index) { |
| final OutputSlot[] allOutputs = this.getAllOutputs(); |
| if (index < 0 || index >= allOutputs.length) { |
| throw new IllegalArgumentException(String.format("Illegal output index: %d.", index)); |
| } |
| return allOutputs[index]; |
| } |
| |
| /** |
| * Retrieve an {@link InputSlot} of this instance by its name. |
| * |
| * @param name of the {@link InputSlot} |
| * @return the requested {@link InputSlot} |
| */ |
| default InputSlot<?> getInput(String name) { |
| for (InputSlot inputSlot : this.getAllInputs()) { |
| if (inputSlot.getName().equals(name)) return inputSlot; |
| } |
| throw new IllegalArgumentException(String.format("No slot with such name: %s", name)); |
| } |
| |
| /** |
| * Retrieve an {@link OutputSlot} of this instance by its name. |
| * |
| * @param name of the {@link OutputSlot} |
| * @return the requested {@link OutputSlot} |
| */ |
| default OutputSlot<?> getOutput(String name) { |
| for (OutputSlot outputSlot : this.getAllOutputs()) { |
| if (outputSlot.getName().equals(name)) return outputSlot; |
| } |
| throw new IllegalArgumentException(String.format("No slot with such name: %s", name)); |
| } |
| |
| /** |
| * @return whether this instance permits broadcast {@link InputSlot}s besides their regular {@link InputSlot}s |
| */ |
| boolean isSupportingBroadcastInputs(); |
| |
| /** |
| * Register an {@link InputSlot} as broadcast input of this instance. |
| * |
| * @param broadcastInput the {@link InputSlot} to be registered |
| * @return the assigned index of the {@link InputSlot} |
| */ |
| int addBroadcastInput(InputSlot<?> broadcastInput); |
| |
| /** |
| * Connect an output of this operator to the input of a second operator. |
| * |
| * @param thisOutputIndex index of the output slot to connect to |
| * @param that operator to connect to |
| * @param thatInputIndex index of the input slot to connect from |
| */ |
| @SuppressWarnings("unchecked") |
| default <T> void connectTo(int thisOutputIndex, Operator that, int thatInputIndex) { |
| final InputSlot<T> inputSlot = (InputSlot<T>) that.getInput(thatInputIndex); |
| final OutputSlot<T> outputSlot = (OutputSlot<T>) this.getOutput(thisOutputIndex); |
| if (!inputSlot.getType().isSupertypeOf(outputSlot.getType())) { |
| throw new IllegalArgumentException(String.format( |
| "Cannot connect %s of %s to %s of type %s.", |
| outputSlot, outputSlot.getType(), inputSlot, inputSlot.getType())); |
| } |
| outputSlot.connectTo(inputSlot); |
| } |
| |
| /** |
| * Connect an output of this operator to the input of a second operator. |
| * |
| * @param thisOutputName name of the output slot to connect to |
| * @param that operator to connect to |
| * @param thatInputName name of the input slot to connect from |
| */ |
| @SuppressWarnings("unchecked") |
| default <T> void connectTo(String thisOutputName, Operator that, String thatInputName) { |
| final InputSlot<T> inputSlot = (InputSlot<T>) that.getInput(thatInputName); |
| final OutputSlot<T> outputSlot = (OutputSlot<T>) this.getOutput(thisOutputName); |
| if (!inputSlot.getType().isSupertypeOf(outputSlot.getType())) { |
| throw new IllegalArgumentException("Cannot connect slots: mismatching types"); |
| } |
| outputSlot.connectTo(inputSlot); |
| } |
| |
| /** |
| * Connect an output of this operator as a broadcast input of a second operator. |
| * |
| * @param thisOutputIndex index of the output slot to connect to |
| * @param that operator to connect to |
| * @param broadcastName name of the broadcast that will be used by the operator to identify the broadcast; must |
| * be unique among all {@link InputSlot}s |
| */ |
| default void broadcastTo(int thisOutputIndex, Operator that, String broadcastName) { |
| final OutputSlot<?> output = this.getOutput(thisOutputIndex); |
| final InputSlot<?> broadcastInput = new InputSlot<>(broadcastName, that, true, output.getType()); |
| final int broadcastIndex = that.addBroadcastInput(broadcastInput); |
| this.connectTo(thisOutputIndex, that, broadcastIndex); |
| } |
| |
| /** |
| * Connect an output of this operator as a broadcast input of a second operator. |
| * |
| * @param thisOutputName name of the output slot to connect to |
| * @param that operator to connect to |
| * @param broadcastName name of the broadcast that will be used by the operator to identify the broadcast; must |
| * be unique among all {@link InputSlot}s |
| */ |
| default void broadcastTo(String thisOutputName, Operator that, String broadcastName) { |
| final OutputSlot<?> output = this.getOutput(thisOutputName); |
| final InputSlot<?> broadcastInput = new InputSlot<>(broadcastName, that, true, output.getType()); |
| final int broadcastIndex = that.addBroadcastInput(broadcastInput); |
| this.connectTo(output.getIndex(), that, broadcastIndex); |
| } |
| |
| /** |
| * Retrieves the effective occupant of the given {@link InputSlot}, i.e., the {@link OutputSlot} that is |
| * either connected to the given or an outer-more, mapped {@link InputSlot}. |
| * |
| * @param inputIndex of the {@link InputSlot} whose effective occupant is requested |
| * @return the effective occupant or {@code null} if none |
| */ |
| @SuppressWarnings("unchecked") |
| default <T> OutputSlot<T> getEffectiveOccupant(int inputIndex) { |
| return this.getOutermostInputSlot((InputSlot<T>) this.getInput(inputIndex)).getOccupant(); |
| } |
| |
| /** |
| * Retrieves the effective occupant of the given {@link InputSlot}, i.e., the {@link OutputSlot} that is |
| * either connected to the given or an outer-more, mapped {@link InputSlot}. |
| * |
| * @param input whose effective occupant is requested |
| * @return the effective occupant or {@code null} if none |
| */ |
| default <T> OutputSlot<T> getEffectiveOccupant(InputSlot<T> input) { |
| return this.getOutermostInputSlot(input).getOccupant(); |
| } |
| |
| /** |
| * Retrieve the outermost {@link InputSlot} if this operator is nested in other operators. |
| * |
| * @param input the slot to track |
| * @return the outermost {@link InputSlot} |
| * @see #getParent() |
| */ |
| default <T> InputSlot<T> getOutermostInputSlot(InputSlot<T> input) { |
| if (!this.isOwnerOf(input)) { |
| throw new IllegalArgumentException("Slot does not belong to this operator."); |
| } |
| |
| if (input.getOccupant() != null) { |
| return input; |
| } |
| |
| // Try to exit through the parent. |
| final OperatorContainer container = this.getContainer(); |
| if (container != null) { |
| final InputSlot<T> tracedInput = container.traceInput(input); |
| if (tracedInput != null) { |
| return container.toOperator().getOutermostInputSlot(tracedInput); |
| } |
| } |
| |
| return input; |
| } |
| |
| /** |
| * Retrieve the {@link InputSlot} of the enclosing {@link OperatorContainer} that represents the given one. |
| * |
| * @param input the slot to track |
| * @return the outer {@link InputSlot} or {@code null} if none |
| * @see #getParent() |
| */ |
| default <T> InputSlot<T> getOuterInputSlot(InputSlot<T> input) { |
| assert this.isOwnerOf(input); |
| |
| // Try to exit through the parent. |
| final OperatorContainer container = this.getContainer(); |
| return container != null ? container.traceInput(input) : null; |
| } |
| |
| /** |
| * Retrieve the outermost {@link OutputSlot}s if this operator is nested in other operators. |
| * |
| * @param output the slot to track |
| * @return the outermost {@link InputSlot} |
| * @see #getParent() |
| */ |
| default <T> Collection<OutputSlot<T>> getOutermostOutputSlots(OutputSlot<T> output) { |
| Validate.isTrue(this.isOwnerOf(output)); |
| |
| if (!output.getOccupiedSlots().isEmpty()) { |
| return Collections.singleton(output); |
| } |
| |
| // Try to exit through the parent. |
| final OperatorContainer container = this.getContainer(); |
| if (container != null) { |
| final Collection<OutputSlot<T>> followedOutputs = container.followOutput(output); |
| if (!followedOutputs.isEmpty()) { |
| return followedOutputs.stream() |
| .flatMap(followedOutput -> container.toOperator().getOutermostOutputSlots(followedOutput).stream()) |
| .collect(Collectors.toList()); |
| } |
| } |
| |
| return Collections.singleton(output); |
| } |
| |
| default boolean isOwnerOf(Slot<?> slot) { |
| return slot.getOwner() == this; |
| } |
| |
| /** |
| * Declare forward rules. Execution engines may take the chance to optimize the executed plans by having |
| * forwarded data by-pass this instance. However, note the specific semantics of a forward rule: If an |
| * {@link Operator} serves an {@link OutputSlot} that is involved in a foward rule, it will do so by forwarding. |
| * If the {@link OutputSlot} is not served, then the forwarding does not apply. |
| * |
| * @return {@link OutputSlot}s to which this instance forwards the given {@code input}. |
| * @see #isReading(InputSlot) |
| */ |
| default Collection<OutputSlot<?>> getForwards(InputSlot<?> input) { |
| assert this.isOwnerOf(input); |
| return Collections.emptyList(); |
| } |
| |
| /** |
| * Checks whether this instance is not connected to any other instance via its {@link Slot}s. This is a typical |
| * property of instances used for {@link org.apache.wayang.core.optimizer.channels.ChannelConversion}s. |
| * |
| * @return whether this instance is unconnected |
| */ |
| default boolean isUnconnected() { |
| for (InputSlot<?> inputSlot : this.getAllInputs()) { |
| if (inputSlot.getOccupant() != null) return false; |
| } |
| for (OutputSlot<?> outputSlot : this.getAllOutputs()) { |
| if (!outputSlot.getOccupiedSlots().isEmpty()) return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Tells whether the given {@code input} is read by this operator. If not, the optimizer can make use of this |
| * insight. |
| * |
| * @see #getForwards(InputSlot) |
| */ |
| default boolean isReading(InputSlot<?> input) { |
| return true; |
| } |
| |
| /** |
| * @return whether this {@code input} is used to close a feedback loop (i.e., a flow graph cycle) |
| */ |
| default boolean isFeedbackInput(InputSlot<?> input) { |
| assert this.isOwnerOf(input); |
| return this.isLoopHead() && ((LoopHeadOperator) this).getLoopBodyInputs().contains(input); |
| } |
| |
| /** |
| * @return whether this {@code output} is used to start a feedback loop (i.e., a flow graph cycle) |
| */ |
| default boolean isFeedforwardOutput(OutputSlot<?> ouput) { |
| assert this.isOwnerOf(ouput); |
| return this.isLoopHead() && ((LoopHeadOperator) this).getLoopBodyOutputs().contains(ouput); |
| } |
| |
| |
| /** |
| * Tells whether this operator is a sink, i.e., it has no outputs. |
| * |
| * @return whether this operator is a sink |
| */ |
| default boolean isSink() { |
| return this.getNumOutputs() == 0; |
| } |
| |
| /** |
| * Tells whether this operator is a source, i.e., it has no inputs. |
| * |
| * @return whether this operator is a source |
| */ |
| default boolean isSource() { |
| return this.getNumInputs() == 0; |
| } |
| |
| default boolean isSubplan() { |
| return this instanceof Subplan; |
| } |
| |
| default boolean isLoopSubplan() { |
| return this instanceof LoopSubplan; |
| } |
| |
| default boolean isAlternative() { |
| return this instanceof OperatorAlternative; |
| } |
| |
| default boolean isExecutionOperator() { |
| return this instanceof ExecutionOperator; |
| } |
| |
| /** |
| * Identify this instance as the head of a loop. It is the only kind of {@link Operator} that can cause |
| * data flow cycles. |
| * |
| * @return whether this instance is the head of a loop |
| */ |
| default boolean isLoopHead() { |
| return this instanceof LoopHeadOperator; |
| } |
| |
| /** |
| * @return whether this is an elementary operator |
| */ |
| default boolean isElementary() { |
| return true; |
| } |
| |
| /** |
| * This method is part of the visitor pattern and calls the appropriate visit method on {@code visitor}. |
| */ |
| <Payload, Return> Return accept(TopDownPlanVisitor<Payload, Return> visitor, OutputSlot<?> outputSlot, Payload payload); |
| |
| // <Payload, Return> Map<InputSlot, Return> accept(BottomUpPlanVisitor<Payload, Return> visitor, InputSlot<?> inputSlot, Payload payload); |
| |
| /** |
| * Operators can be nested in other operators, e.g., in a {@link Subplan} or a {@link OperatorAlternative}. |
| * |
| * @return the parent of this operator or {@code null} if this is a top-level operator |
| */ |
| default CompositeOperator getParent() { |
| final OperatorContainer container = this.getContainer(); |
| return container == null ? null : container.toOperator(); |
| } |
| |
| OperatorContainer getContainer(); |
| |
| /** |
| * Operators can be nested in other operators, e.g., in a {@link Subplan} or a {@link OperatorAlternative.Alternative}. |
| * |
| * @param newContainer the new container of this operator or {@code null} to declare it top-level |
| */ |
| void setContainer(OperatorContainer newContainer); |
| |
| /** |
| * @return the innermost {@link LoopSubplan} containing this instance |
| */ |
| default LoopSubplan getInnermostLoop() { |
| final CompositeOperator parent = this.getParent(); |
| if (parent == null) { |
| return null; |
| } else if (parent.isLoopSubplan()) { |
| return (LoopSubplan) parent; |
| } else { |
| return parent.getInnermostLoop(); |
| } |
| } |
| |
| /** |
| * @return the stack of nested {@link LoopSubplan}s of this instance, from inside to outside |
| */ |
| default LinkedList<LoopSubplan> getLoopStack() { |
| LinkedList<LoopSubplan> loopStack = new LinkedList<>(); |
| LoopSubplan nextLoop = this.getInnermostLoop(); |
| while (nextLoop != null) { |
| loopStack.addLast(nextLoop); |
| nextLoop = nextLoop.getInnermostLoop(); |
| } |
| return loopStack; |
| } |
| |
| /** |
| * Each operator is associated with an epoch, which is a logical timestamp for the operator's creation. |
| * This value is the lowest timestamp and default epoch. |
| */ |
| int FIRST_EPOCH = 0; |
| |
| /** |
| * <i>Optional operation for non-composite operators.</i> |
| * |
| * @param epoch the operator's new epoch value |
| * @see #FIRST_EPOCH |
| */ |
| void setEpoch(int epoch); |
| |
| |
| /** |
| * <i>Optional operation for non-composite operators.</i> |
| * |
| * @return the operator's epoch value |
| * @see #FIRST_EPOCH |
| */ |
| int getEpoch(); |
| |
| /** |
| * Provide a {@link CardinalityPusher} for the {@link Operator}. |
| * |
| * @param configuration if the {@link CardinalityPusher} depends on further ones, use this to obtain the latter |
| * @return the {@link CardinalityPusher} |
| */ |
| default CardinalityPusher getCardinalityPusher(final Configuration configuration) { |
| return new DefaultCardinalityPusher(this, configuration.getCardinalityEstimatorProvider()); |
| } |
| |
| /** |
| * Tells if this instance should be executed/implemented only on a certain set of {@link Platform}s. |
| * |
| * @return the targeted {@link Platform}s or an empty {@link Set} if there is no such restriction |
| */ |
| Set<Platform> getTargetPlatforms(); |
| |
| /** |
| * <i>Optional operation.</i> Restrict this instance to be executed/implemented on a certain {@link Platform}s or |
| * allow a further one if there is already a restriction in place. |
| */ |
| void addTargetPlatform(Platform platform); |
| |
| /** |
| * Convenience version of {@link Operator#propagateOutputCardinality(int, OptimizationContext.OperatorContext)}, |
| * where the adjacent {@link InputSlot}s reside in the same {@link OptimizationContext} as the {@code operatorContext}. |
| */ |
| default void propagateOutputCardinality(int outputIndex, OptimizationContext.OperatorContext operatorContext) { |
| this.propagateOutputCardinality(outputIndex, operatorContext, operatorContext.getOptimizationContext()); |
| } |
| |
| /** |
| * Propagates the {@link CardinalityEstimate} of an {@link OutputSlot} within the {@code operatorContext} to |
| * <ul> |
| * <li>fed {@link InputSlot}s (which in turn are asked to propagate) and</li> |
| * <li><b>inner</b>, mapped {@link OutputSlot}s.</li> |
| * </ul> |
| * |
| * @param outputIndex of the {@link OutputSlot} |
| * @param operatorContext holds the {@link CardinalityEstimate} to be propagated |
| * @param targetContext to which the {@link CardinalityEstimate}s should be propagated |
| */ |
| void propagateOutputCardinality(int outputIndex, |
| OptimizationContext.OperatorContext operatorContext, |
| OptimizationContext targetContext); |
| |
| /** |
| * Propagates the {@link CardinalityEstimate} of an {@link InputSlot} within the {@code operatorContext} |
| * to <b>inner</b>, mapped {@link InputSlot}s. |
| * |
| * @param inputIndex of the {@link InputSlot} |
| * @param operatorContext holds the {@link CardinalityEstimate} to be propagated |
| */ |
| void propagateInputCardinality(int inputIndex, |
| OptimizationContext.OperatorContext operatorContext); |
| |
| /** |
| * Collect all inner {@link OutputSlot}s that are mapped to the given {@link OutputSlot}. |
| */ |
| <T> Set<OutputSlot<T>> collectMappedOutputSlots(OutputSlot<T> output); |
| |
| /** |
| * Collect all inner {@link InputSlot}s that are mapped to the given {@link InputSlot}. |
| */ |
| <T> Set<InputSlot<T>> collectMappedInputSlots(InputSlot<T> input); |
| |
| /** |
| * Provides an instance's name. |
| * |
| * @return the name of this instance or {@code null} if none |
| */ |
| String getName(); |
| |
| /** |
| * Provide a name for this instance. |
| * |
| * @param name the name |
| */ |
| void setName(String name); |
| |
| /** |
| * Collects all fields of this instance that have a {@link EstimationContextProperty} annotation. |
| * |
| * @return the fields |
| */ |
| default Collection<String> getEstimationContextProperties() { |
| Set<String> properties = new HashSet<>(2); |
| Queue<Class<?>> classQueue = new LinkedList<>(); |
| classQueue.add(this.getClass()); |
| while (!classQueue.isEmpty()) { |
| final Class<?> cls = classQueue.poll(); |
| if (cls.getSuperclass() != null) classQueue.add(cls.getSuperclass()); |
| for (Field declaredField : cls.getDeclaredFields()) { |
| final EstimationContextProperty annotation = declaredField.getDeclaredAnnotation(EstimationContextProperty.class); |
| if (annotation != null) { |
| properties.add(declaredField.getName()); |
| } |
| } |
| } |
| return properties; |
| } |
| |
| } |
| |