| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.core.plan.wayangplan; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| import org.apache.wayang.core.api.Configuration; |
| import org.apache.wayang.core.optimizer.OptimizationContext; |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityPusher; |
| import org.apache.wayang.core.optimizer.cardinality.OperatorAlternativeCardinalityPusher; |
| import org.apache.wayang.core.util.WayangCollections; |
| |
| /** |
| * This operator encapsulates operators that are alternative to each other. |
| * <p>Alternatives and their interfaces (i.e., {@link OutputSlot}s and {@link InputSlot}s) are matched via their |
| * input/output indices.</p> |
| */ |
| public class OperatorAlternative extends OperatorBase implements CompositeOperator { |
| |
| /** |
| * All alternatives for this operator. Note that we deliberately do not use a {@link SlotMapping} at this point |
| * because this can be achieved with a {@link Subplan}. |
| */ |
| private List<Alternative> alternatives = new LinkedList<>(); |
| |
| public static OperatorAlternative wrap(Operator startOperator, Operator endOperator) { |
| if (startOperator != endOperator) { |
| // TODO |
| throw new UnsupportedOperationException("Different operators in a match currently not supported."); |
| } |
| |
| return wrap(startOperator); |
| } |
| |
| /** |
| * Wraps an {@link Operator}: |
| * <ol> |
| * <li>Creates a new instance that mocks the interface (slots) of the given operator,</li> |
| * <li>steals the connections from the given operator,</li> |
| * <li>adapts its parent and becomes its new parent.</li> |
| * <li>Moreover, the given operator is set up as the first alternative.</li> |
| * </ol> |
| * |
| * @param operator operator to wrap |
| */ |
| public static OperatorAlternative wrap(Operator operator) { |
| // Test whether the operator is not already an alternative. |
| final OperatorContainer container = operator.getContainer(); |
| if (container != null && container.toOperator().isAlternative() && container.getContainedOperators().size() == 1) { |
| return (OperatorAlternative) container.toOperator(); |
| } |
| |
| OperatorAlternative operatorAlternative = operator.isLoopHead() ? |
| new LoopHeadAlternative((LoopHeadOperator) operator) : |
| new OperatorAlternative(operator); |
| InputSlot.mock(operator, operatorAlternative); |
| OutputSlot.mock(operator, operatorAlternative); |
| |
| Alternative alternative = operatorAlternative.addAlternative(operator); |
| if (container != null) { |
| operatorAlternative.setContainer(container); |
| container.noteReplaced(operator, alternative); |
| } |
| return operatorAlternative; |
| } |
| |
| /** |
| * Creates a new instance with the same number of inputs and outputs and the same parent as the given operator. |
| */ |
| protected OperatorAlternative(Operator operator) { |
| super(operator.getNumInputs(), operator.getNumOutputs(), false); |
| } |
| |
| public List<Alternative> getAlternatives() { |
| return Collections.unmodifiableList(this.alternatives); |
| } |
| |
| /** |
| * Adds an {@link Alternative} to this instance. |
| * |
| * @param alternativeOperator either an {@link ElementaryOperator} or a {@link Subplan}; in the latter case, the |
| * {@link Subplan} will be "unpacked" into the new {@link Alternative} |
| */ |
| public Alternative addAlternative(Operator alternativeOperator) { |
| Alternative alternative = this.createAlternative(); |
| if (alternativeOperator.isSubplan()) { |
| OperatorContainers.move((Subplan) alternativeOperator, alternative); |
| } else { |
| assert alternativeOperator.isElementary(); |
| OperatorContainers.wrap(alternativeOperator, alternative); |
| } |
| this.alternatives.add(alternative); |
| return alternative; |
| } |
| |
| protected Alternative createAlternative() { |
| return new Alternative(); |
| } |
| |
| @Override |
| public <Payload, Return> Return accept(TopDownPlanVisitor<Payload, Return> visitor, OutputSlot<?> outputSlot, Payload payload) { |
| return visitor.visit(this, outputSlot, payload); |
| } |
| |
| @Override |
| public void noteReplaced(Operator oldOperator, Operator newOperator) { |
| } |
| |
| @Override |
| public void propagateOutputCardinality(int outputIndex, |
| OptimizationContext.OperatorContext operatorContext, |
| OptimizationContext targetContext) { |
| super.propagateOutputCardinality(outputIndex, operatorContext, targetContext); |
| this.getAlternatives().forEach(alternative -> alternative.propagateOutputCardinality(outputIndex, operatorContext)); |
| } |
| |
| @Override |
| public void propagateInputCardinality(int inputIndex, OptimizationContext.OperatorContext operatorContext) { |
| super.propagateInputCardinality(inputIndex, operatorContext); |
| this.getAlternatives().forEach(alternative -> alternative.propagateInputCardinality(inputIndex, operatorContext)); |
| } |
| |
| @Override |
| public <T> Set<OutputSlot<T>> collectMappedOutputSlots(OutputSlot<T> output) { |
| return Stream.concat( |
| Stream.of(output), |
| this.alternatives.stream().flatMap(alternative -> this.streamMappedOutputSlots(alternative, output)) |
| ).collect(Collectors.toSet()); |
| } |
| |
| private <T> Stream<OutputSlot<T>> streamMappedOutputSlots( |
| OperatorAlternative.Alternative alternative, |
| OutputSlot<T> output) { |
| final OutputSlot<T> innerOutput = alternative.traceOutput(output); |
| return innerOutput == null ? |
| Stream.empty() : |
| innerOutput.getOwner().collectMappedOutputSlots(innerOutput).stream(); |
| } |
| |
| @Override |
| public <T> Set<InputSlot<T>> collectMappedInputSlots(InputSlot<T> input) { |
| return Stream.concat( |
| Stream.of(input), |
| this.alternatives.stream().flatMap(alternative -> this.streamMappedInputSlots(alternative, input)) |
| ).collect(Collectors.toSet()); |
| } |
| |
| private <T> Stream<InputSlot<T>> streamMappedInputSlots( |
| OperatorAlternative.Alternative alternative, |
| InputSlot<T> input) { |
| final Collection<InputSlot<T>> innerInputs = alternative.followInput(input); |
| return Stream.concat( |
| Stream.of(input), |
| innerInputs.stream().flatMap(innerInput -> innerInput.getOwner().collectMappedInputSlots(innerInput).stream()) |
| ); |
| } |
| |
| @Override |
| public CardinalityPusher getCardinalityPusher(final Configuration configuration) { |
| return new OperatorAlternativeCardinalityPusher(this, configuration); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public Collection<OperatorContainer> getContainers() { |
| return (Collection<OperatorContainer>) (Collection) this.alternatives; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%s[%dx ~%s, %x]", |
| this.getSimpleClassName(), |
| this.alternatives.size(), |
| WayangCollections.getAnyOptional(this.alternatives).orElse(null), |
| this.hashCode()); |
| } |
| |
| /** |
| * Represents an alternative subplan for the enclosing {@link OperatorAlternative}. |
| */ |
| public class Alternative implements OperatorContainer { |
| |
| /** |
| * Maps the slots of the enclosing {@link OperatorAlternative} with this instance. |
| */ |
| private final SlotMapping slotMapping = new SlotMapping(); |
| |
| /** |
| * Source/sink {@link Operator} in this instance. Should only be set if the surrounding {@link OperatorAlternative} |
| * is a source/sink. |
| */ |
| private Operator source, sink; |
| |
| |
| private Alternative() { |
| } |
| |
| @Override |
| public SlotMapping getSlotMapping() { |
| return this.slotMapping; |
| } |
| |
| @Override |
| public Operator getSink() { |
| return this.sink; |
| } |
| |
| @Override |
| public void setSink(Operator innerSink) { |
| this.sink = innerSink; |
| } |
| |
| @Override |
| public <T> OutputSlot<T> traceOutput(OutputSlot<T> alternativeOutputSlot) { |
| // If this alternative is not a sink, we trace the given output slot via the slot mapping. |
| if (!OperatorAlternative.this.isOwnerOf(alternativeOutputSlot)) { |
| throw new IllegalArgumentException("Cannot enter alternative: Output slot does not belong to this alternative."); |
| } |
| |
| final OutputSlot<T> resolvedSlot = this.slotMapping.resolveUpstream(alternativeOutputSlot); |
| if (resolvedSlot != null && resolvedSlot.getOwner().getParent() != OperatorAlternative.this) { |
| final String msg = String.format("Cannot enter through: Owner of inner OutputSlot (%s) is not a child of this alternative (%s).", |
| Operators.collectParents(resolvedSlot.getOwner(), true), |
| Operators.collectParents(OperatorAlternative.this, true)); |
| throw new IllegalStateException(msg); |
| } |
| return resolvedSlot; |
| } |
| |
| @Override |
| public OperatorAlternative toOperator() { |
| return OperatorAlternative.this; |
| } |
| |
| @Override |
| public Operator getSource() { |
| return this.source; |
| } |
| |
| @Override |
| public void setSource(Operator innerSource) { |
| this.source = innerSource; |
| } |
| |
| @Override |
| public <T> Collection<InputSlot<T>> followInput(InputSlot<T> inputSlot) { |
| if (!OperatorAlternative.this.isOwnerOf(inputSlot)) { |
| throw new IllegalArgumentException("Cannot enter alternative: invalid input slot."); |
| } |
| |
| final Collection<InputSlot<T>> resolvedSlots = this.slotMapping.resolveDownstream(inputSlot); |
| for (InputSlot<T> resolvedSlot : resolvedSlots) { |
| if (resolvedSlot != null && resolvedSlot.getOwner().getParent() != OperatorAlternative.this) { |
| final String msg = String.format("Cannot enter through: Owner of inner OutputSlot (%s) is not a child of this alternative (%s).", |
| Operators.collectParents(resolvedSlot.getOwner(), true), |
| Operators.collectParents(OperatorAlternative.this, true)); |
| throw new IllegalStateException(msg); |
| } |
| } |
| return resolvedSlots; |
| } |
| |
| @Override |
| public <T> InputSlot<T> traceInput(InputSlot<T> inputSlot) { |
| if (inputSlot.getOwner().getContainer() != this) { |
| throw new IllegalArgumentException("Cannot trace input slot: does not belong to this alternative."); |
| } |
| |
| final InputSlot<T> tracedInput = this.slotMapping.resolveUpstream(inputSlot); |
| assert tracedInput == null || inputSlot.getOccupant() == null : String.format( |
| "%s has both the occupant %s and the outer mapped input %s.", |
| inputSlot, inputSlot.getOccupant(), tracedInput |
| ); |
| return tracedInput; |
| } |
| |
| @Override |
| public <T> Collection<OutputSlot<T>> followOutput(OutputSlot<T> outputSlot) { |
| if (outputSlot.getOwner().getContainer() != this) { |
| throw new IllegalArgumentException("OutputSlot does not belong to this Alternative."); |
| } |
| return this.slotMapping.resolveDownstream(outputSlot); |
| } |
| |
| public OperatorAlternative getOperatorAlternative() { |
| return OperatorAlternative.this; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%s[%s]", this.getClass().getSimpleName(), this.getContainedOperators()); |
| } |
| } |
| } |