blob: 738f4a7b7a05fe997869b3ae4276025e5ca7244d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.optimizer;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
/**
* Rules describe a pattern of operators. They also reference a Transformer.
* If the pattern of operators is found one or more times in the provided plan,
* then the optimizer will use the associated Transformer to transform the
* plan.
* Note: the pattern matching logic implemented here has a limitation
* that it assumes that all the leaves in the pattern are siblings. See more
* detailed description here - https://issues.apache.org/jira/browse/PIG-1742
* If new rules use patterns that don't work with this limitation, the pattern
* match logic will need to be updated.
*/
public abstract class Rule {
protected String name = null;
protected OperatorPlan pattern;
transient protected OperatorPlan currentPlan;
protected static final Log log = LogFactory.getLog(Rule.class);
private transient Set<Operator> matchedNodes = new HashSet<Operator>();
private boolean mandatory;
private boolean skipListener = false;
/**
* Create this rule by using the default pattern that this rule provided
* @param n Name of this rule
* @param mandatory if it is set to false, this rule can be disabled by user
*/
public Rule(String n, boolean mandatory) {
name = n;
pattern = buildPattern();
this.mandatory = mandatory;
}
/**
* @param n Name of this rule
* @param p Pattern to look for.
*/
public Rule(String n, OperatorPlan p) {
name = n;
pattern = p;
}
/**
* Build the pattern that this rule will look for
* @return the pattern to look for by this rule
*/
abstract protected OperatorPlan buildPattern();
/**
* Get the transformer for this rule. Abstract because the rule
* may want to choose how to instantiate the transformer.
* This should never return a cached transformer, it should
* always return a fresh one with no state.
* @return Transformer to use with this rule
*/
abstract public Transformer getNewTransformer();
/**
* Return the pattern to be matched for this rule
* @return the pattern to be matched for this rule
*/
public OperatorPlan getPattern() {
return pattern;
}
protected boolean isSkipListener() {
return this.skipListener;
}
protected void setSkipListener(boolean skip) {
this.skipListener = skip;
}
/**
* Search for all the sub-plans that matches the pattern
* defined by this rule.
* See class description above for limitations on the patterns supported.
* @return A list of all matched sub-plans. The returned plans are
* partial views of the original OperatorPlan. Each is a
* sub-set of the original plan and represents the same
* topology as the pattern, but operators in the returned plan
* are the same objects as the original plan. Therefore,
* a call getPlan() from any node in the return plan would
* return the original plan.
*
* @param plan the OperatorPlan to look for matches to the pattern
*/
public List<OperatorPlan> match(OperatorPlan plan) throws FrontendException {
currentPlan = plan;
List<Operator> leaves = pattern.getSinks();
Iterator<Operator> iter = plan.getOperators();
List<OperatorPlan> matchedList = new ArrayList<OperatorPlan>();
matchedNodes.clear();
while(iter.hasNext()) {
Operator op = iter.next();
// find a node that matches the first leaf of the pattern
if (match(op, leaves.get(0))) {
List<Operator> planOps = new ArrayList<Operator>();
planOps.add(op);
// if there is more than 1 leaves in the pattern, we check
// if other leaves match the siblings of this node
if (leaves.size()>1) {
boolean matched = true;
List<Operator> preds = null;
preds = plan.getPredecessors(op);
// if this node has no predecessor, it must be a root
if (preds == null) {
preds = new ArrayList<Operator>();
preds.add(null);
}
for(Operator s: preds) {
matched = true;
List<Operator> siblings = null;
if (s != null) {
siblings = plan.getSuccessors(s);
}else{
// for a root, we get its siblings by getting all roots
siblings = plan.getSources();
}
int index = siblings.indexOf(op);
if (siblings.size()-index < leaves.size()) {
continue;
}
for(int j=1; j<leaves.size(); j++) {
if (!match(siblings.get(index+j), leaves.get(j))) {
matched = false;
break;
}
}
if (matched) {
for(int j=1; j<leaves.size(); j++) {
planOps.add(siblings.get(index+j));
}
}
}
// we have move on to next operator as this one doesn't have siblings to
// match all the leaves
if (!matched) {
continue;
}
}
PatternMatchOperatorPlan match = new PatternMatchOperatorPlan(plan);
if (match.check(planOps)) {
// we find a matched pattern,
// add the operators into matchedNodes
Iterator<Operator> iter2 = match.getOperators();
while(iter2.hasNext()) {
Operator opt = iter2.next();
matchedNodes.add(opt);
}
// add pattern
matchedList.add(match);
}
}
}
return matchedList;
}
public String getName() {
return name;
}
public boolean isMandatory() {
return mandatory;
}
/**
* Check if two operators match each other, we want to find matches
* that don't share nodes
*/
private boolean match(Operator planNode, Operator patternNode) {
return planNode.getClass().equals(patternNode.getClass()) && !matchedNodes.contains(planNode);
}
private class PatternMatchOperatorPlan extends OperatorSubPlan {
public PatternMatchOperatorPlan(OperatorPlan basePlan) {
super(basePlan);
}
protected boolean check(List<Operator> planOps) throws FrontendException {
List<Operator> patternOps = pattern.getSinks();
if (planOps.size() != patternOps.size()) {
return false;
}
for(int i=0; i<planOps.size(); i++) {
Deque<Operator> s = new LinkedList<Operator>();
if (!check(planOps.get(i), patternOps.get(i), s)) {
return false;
}
Iterator<Operator> iter = s.iterator();
while(iter.hasNext()) {
add(iter.next());
}
}
if (size() == pattern.size()) {
return true;
}
return false;
}
/**
* Check if the plan operator and its sub-tree has a match to the pattern
* operator and its sub-tree. This algorithm only search and return one match.
* It doesn't recursively search for all possible matches. For example,
* for a plan that looks like
* join
* / \
* load load
* if we are looking for join->load pattern, only one match will be returned instead
* of two, so that the matched subsets don't share nodes.
*/
private boolean check(Operator planOp, Operator patternOp, Deque<Operator> opers) throws FrontendException {
if (!match(planOp, patternOp)) {
return false;
}
// check if their predecessors match
List<Operator> preds1 = getBasePlan().getPredecessors(planOp);
List<Operator> preds2 = pattern.getPredecessors(patternOp);
if (preds1 == null && preds2 != null) {
return false;
}
if (preds1 != null && preds2 != null && preds1.size() < preds2.size()) {
return false;
}
// we've reached the root of the pattern, so a match is found
if (preds2 == null || preds2.size() == 0) {
opers.push(planOp);
return true;
}
int index = 0;
// look for predecessors
while(index < preds1.size()) {
boolean match = true;
if (match(preds1.get(index), preds2.get(0))) {
if ( (preds1.size() - index) < preds2.size()) {
return false;
}
int oldSize = opers.size();
for(int i=0; i<preds2.size(); i++) {
if (!check(preds1.get(index+i), preds2.get(i), opers)) {
for(int j=opers.size(); j>oldSize; j--) {
opers.pop();
}
match = false;
break;
}
}
if (match) {
opers.push(planOp);
return true;
}
}
index++;
}
return false;
}
}
}