blob: 5eb1d2fea5e73aa5cc5c2037b0656a418359e1d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.rules;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.optimizer.Transformer;
public class MergeFilter extends Rule {
public MergeFilter(String n) {
super(n, false);
}
@Override
public Transformer getNewTransformer() {
return new MergeFilterTransformer();
}
public class MergeFilterTransformer extends Transformer {
private OperatorSubPlan subPlan;
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
LOFilter filter = (LOFilter)matched.getSources().get(0);
List<Operator> succeds = currentPlan.getSuccessors(filter);
// if this filter is followed by another filter, we should combine them
if (succeds != null && succeds.size() == 1) {
if (succeds.get(0) instanceof LOFilter) {
return true;
}
}
return false;
}
@Override
public void transform(OperatorPlan matched) throws FrontendException {
subPlan = new OperatorSubPlan(currentPlan);
LOFilter filter = (LOFilter)matched.getSources().get(0);
subPlan.add(filter);
List<Operator> succeds = currentPlan.getSuccessors(filter);
if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) {
LOFilter next = (LOFilter)succeds.get(0);
combineFilterCond(filter, next);
List<Operator> succs = currentPlan.getSuccessors(next);
if (succs!=null && succs.size()>0) {
subPlan.add(succs.get(0));
}
// Since we remove next, we need to merge soft link into filter
List<Operator> nextSoftPreds = null;
if (currentPlan.getSoftLinkPredecessors(next)!=null) {
nextSoftPreds = new ArrayList<Operator>();
nextSoftPreds.addAll(currentPlan.getSoftLinkPredecessors(next));
}
if (nextSoftPreds!=null) {
for (Operator softPred : nextSoftPreds) {
currentPlan.removeSoftLink(softPred, next);
currentPlan.createSoftLink(softPred, filter);
}
}
currentPlan.removeAndReconnect(next);
}
Iterator<Operator> iter = filter.getFilterPlan().getOperators();
while (iter.hasNext()) {
Operator oper = iter.next();
if (oper instanceof ProjectExpression) {
((ProjectExpression)oper).setAttachedRelationalOp(filter);
}
}
}
@Override
public OperatorPlan reportChanges() {
return subPlan;
}
// combine the condition of two filters. The condition of second filter
// is added into the condition of first filter with an AND operator.
private void combineFilterCond(LOFilter f1, LOFilter f2) throws FrontendException {
LogicalExpressionPlan p1 = f1.getFilterPlan();
LogicalExpressionPlan p2 = f2.getFilterPlan();
LogicalExpressionPlan andPlan = new LogicalExpressionPlan();
// add existing operators
Iterator<Operator> iter = p1.getOperators();
while(iter.hasNext()) {
andPlan.add(iter.next());
}
iter = p2.getOperators();
while(iter.hasNext()) {
andPlan.add(iter.next());
}
// add all connections
iter = p1.getOperators();
while(iter.hasNext()) {
Operator n = iter.next();
List<Operator> l = p1.getPredecessors(n);
if (l != null) {
for(Operator op: l) {
andPlan.connect(op, n);
}
}
}
iter = p2.getOperators();
while(iter.hasNext()) {
Operator n = iter.next();
List<Operator> l = p2.getPredecessors(n);
if (l != null) {
for(Operator op: l) {
andPlan.connect(op, n);
}
}
}
// create an AND
new AndExpression(andPlan, (LogicalExpression)p1.getSources().get(0), (LogicalExpression)p2.getSources().get(0));
f1.setFilterPlan(andPlan);
}
}
@Override
protected OperatorPlan buildPattern() {
// the pattern that this rule looks for
// is filter operator
LogicalPlan plan = new LogicalPlan();
LogicalRelationalOperator op = new LOFilter(plan);
plan.add(op);
return plan;
}
}