blob: b6eadda26a93cedf1c11c62d940e261ae89d74b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.mapping;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.lang3.Validate;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.plan.wayangplan.PlanTraversal;
import org.apache.wayang.core.plan.wayangplan.TopDownPlanVisitor;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
/**
* A subplan pattern describes a class of subplans in a {@link WayangPlan}.
* <p><i>NB: Currently, only such patterns are tested and supported that form a chain of operators, i.e., no DAGs
* are allowed and at most one input and one output operator.</i></p>
*/
public class SubplanPattern extends OperatorBase {
/**
* Start and end {@link OperatorPattern} of this instance.
*/
private final OperatorPattern inputPattern, outputPattern;
/**
* Creates a new instance that matches only a single operator.
*
* @param operatorPattern the only operator pattern
* @return the new instance
*/
public static final SubplanPattern createSingleton(OperatorPattern operatorPattern) {
return fromOperatorPatterns(operatorPattern, operatorPattern);
}
/**
* Creates a new instance that matches a graph of operator patterns.
*
* @param inputOperatorPattern the only operator pattern that has inputs wrt. the subplan pattern
* @param outputOperatorPattern the only operator pattern that has outputs wrt. the subplan pattern
* @return the new instance
*/
public static final SubplanPattern fromOperatorPatterns(OperatorPattern inputOperatorPattern,
OperatorPattern outputOperatorPattern) {
return new SubplanPattern(inputOperatorPattern, outputOperatorPattern);
}
/**
* Creates a new instance.
*/
private SubplanPattern(OperatorPattern inputPattern, OperatorPattern outputPattern) {
super(inputPattern.getAllInputs(), outputPattern.getAllOutputs(), false);
this.inputPattern = inputPattern;
this.outputPattern = outputPattern;
}
/**
* Match this pattern against a plan.
*
* @param plan the plan to match against
* @param minEpoch the (inclusive) minimum epoch value for matched subplans
* @return all matches
*/
public List<SubplanMatch> match(WayangPlan plan, int minEpoch) {
return new Matcher(minEpoch).match(plan);
}
public OperatorPattern getInputPattern() {
return this.inputPattern;
}
public OperatorPattern getOutputPattern() {
return this.outputPattern;
}
@Override
public <Payload, Return> Return accept(TopDownPlanVisitor<Payload, Return> visitor, OutputSlot<?> outputSlot, Payload payload) {
throw new RuntimeException("Pattern does not accept visitors.");
}
/**
* This class encapsulates the functionality to match a subplan pattern against a plan. It works as follows:
* <ol>
* <li>for each operator in the plan, start a new match attempt</li>
* <li>iterate through the graph pattern</li>
* <li>co-iterate the actual graph</li>
* <li>if we could co-iterate the two, create a match for the match attempt</li>
* </ol>
*/
private class Matcher {
/**
* Operators that have been considered to match against this pattern's sink.
*/
final private Set<Operator> visitedOutputOperators = new HashSet<>();
/**
* Collects all the matches.
*/
final private List<SubplanMatch> matches = new LinkedList<>();
/**
* @param minEpoch the (inclusive) minimum epoch value for matched subplans
*/
private final int minEpoch;
/**
* Creates a new instance.
*
* @param minEpoch see {@link #minEpoch}
*/
public Matcher(int minEpoch) {
this.minEpoch = minEpoch;
}
/**
* Run this instance over the given {@link WayangPlan}.
*
* @return a {@link List} of all {@link SubplanMatch}es established by the run
*/
public List<SubplanMatch> match(WayangPlan plan) {
// Start an attempt to match from each operator that is upstream-reachable from one of the WayangPlan sinks.
PlanTraversal.upstream().traversingHierarchically()
.withCallback(this::attemptMatchFrom)
.traverse(plan.getSinks());
return this.matches;
}
/**
* Try to match the given operator pattern..
*
* @param operator the operator that should be matched with the operator pattern
*/
private void attemptMatchFrom(Operator operator, InputSlot<?> fromInputSlot, OutputSlot<?> fromOutputSlot) {
Validate.isTrue(fromInputSlot == null, "Cannot handle downstream traversals.");
// Try to make a match starting from the currently visited operator.
final SubplanMatch subplanMatch = new SubplanMatch(SubplanPattern.this);
this.match(SubplanPattern.this.outputPattern, operator, fromOutputSlot, subplanMatch);
}
/**
* Recursively match the given operator pattern and operator including their input operators.
*
* @param pattern the next {@link OperatorPattern} to match with
* @param operator the {@link Operator} that should match with the {@code pattern}
* @param trackedOutputSlot the {@link OutputSlot} of the {@link Operator} that we are coming from
* @param subplanMatch collects the {@link OperatorMatch}es on success
*/
private void match(OperatorPattern pattern,
Operator operator,
OutputSlot<?> trackedOutputSlot,
SubplanMatch subplanMatch) {
// Make sure that nobody expects more from this instance than it can handle.
if (pattern.getNumInputs() > 1 &&
Arrays.stream(pattern.getAllInputs())
.map(InputSlot::getOccupant)
.filter(Objects::nonNull)
.count() > 1) {
throw new WayangException("Cannot match pattern: Operator with more than one occupied input not supported, yet.");
}
// We expect a regular Operator here.
// Try to match the co-iterated operator (pattern).
assert operator.isElementary();
final OperatorMatch operatorMatch = pattern.match(operator);
if (operatorMatch == null) {
// If match was not successful, abort. NB: This might change if we have, like, real graph patterns.
return;
}
subplanMatch.addOperatorMatch(operatorMatch);
// Now we need to go further upstream and try to match all the input OperatorPatterns.
// NB: As of now, that should be exactly one (see top).
boolean hasInputOperatorPatterns = false;
for (int inputIndex = 0; inputIndex < pattern.getNumInputs(); inputIndex++) {
final OutputSlot<?> patternOccupant = pattern.getEffectiveOccupant(pattern.getInput(inputIndex));
if (patternOccupant == null) {
continue;
}
final OperatorPattern inputOperatorPattern = (OperatorPattern) patternOccupant.getOwner();
hasInputOperatorPatterns = true;
final InputSlot<?> outerInputSlot = operator.getOutermostInputSlot(operator.getInput(inputIndex));
final OutputSlot<?> occupant = outerInputSlot.getOccupant();
if (occupant != null) {
this.match(inputOperatorPattern, occupant.getOwner(), occupant, subplanMatch);
}
}
if (!hasInputOperatorPatterns) {
if (subplanMatch.getMaximumEpoch() >= this.minEpoch) {
this.matches.add(subplanMatch);
}
}
}
}
}