blob: 31f060e513a6b1c4960816b2dc758657747f8db4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.rules;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LONative;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.optimizer.Transformer;
public class PushUpFilter extends Rule {
public PushUpFilter(String n) {
super(n, false);
}
@Override
public Transformer getNewTransformer() {
return new PushUpFilterTransformer();
}
public class PushUpFilterTransformer extends Transformer {
private OperatorSubPlan subPlan;
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
// check if it is inner join
Operator current = matched.getSources().get(0);
Operator pred = findNonFilterPredecessor( current );
if( pred == null )
return false;
// sort and union are always okay.
if( pred instanceof LOSort || pred instanceof LOUnion ) {
return true;
}
// if the predecessor is one of LOLoad/LOStore/LOStream/LOLimit/LONative
// if predecessor is LOForEach, it is optimized by rule FilterAboveForeach
// return false
if( pred instanceof LOLoad || pred instanceof LOStore || pred instanceof LOStream ||
pred instanceof LOFilter || pred instanceof LOSplit || pred instanceof LOSplitOutput ||
pred instanceof LOLimit || pred instanceof LONative || pred instanceof LOForEach) {
return false;
}
LOFilter filter = (LOFilter)current;
List<Operator> preds = currentPlan.getPredecessors( pred );
LogicalExpressionPlan filterPlan = filter.getFilterPlan();
if (OptimizerUtils.planHasNonDeterministicUdf(filterPlan)) {
return false;
}
//if there is no nondeterministic udf, filter can be pushed above
// Distinct
if(pred instanceof LODistinct){
return true;
}
// collect all uids used in the filter plan
Set<Long> uids = collectUidFromExpPlan(filterPlan);
if( pred instanceof LOCogroup ) {
LOCogroup cogrp = (LOCogroup)pred;
if( preds.size() == 1 ) {
if( hasAll( (LogicalRelationalOperator)preds.get( 0 ), uids ) ) {
// Order by is ok if all UIDs can be found from previous operator.
return true;
}
} else if ( 1 == cogrp.getExpressionPlans().get( 0 ).size() && !containUDF( filterPlan ) ) {
// Optimization is possible if there is only a single key.
// For regular cogroup, we cannot use UIDs to determine if filter can be pushed up.
// But if there is no UDF, it's okay, as only UDF can take bag field as input.
return true;
}
}
// if the predecessor is a multi-input operator then detailed
// checks are required
if( pred instanceof LOCross || pred instanceof LOJoin ) {
boolean[] innerFlags = null;
boolean isFullOuter = true;
boolean isInner = true;
if( pred instanceof LOJoin ) {
innerFlags = ((LOJoin)pred).getInnerFlags();
// If all innerFlag is false, means a full outer join,
for (boolean inner : innerFlags) {
if (inner) {
isFullOuter = false;
} else {
isInner = false;
}
}
if (isFullOuter)
return false;
}
for(int j=0; j<preds.size(); j++) {
if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
// For LOJoin, innerFlag==true indicate that branch is the outer join side
// which has the exact opposite semantics
if (pred instanceof LOCross || pred instanceof LOJoin && (isInner || innerFlags[j]))
return true;
}
}
}
return false;
}
private boolean containUDF(LogicalExpressionPlan filterPlan) {
Iterator<Operator> it = filterPlan.getOperators();
while( it.hasNext() ) {
if( it.next() instanceof UserFuncExpression )
return true;
}
return false;
}
Set<Long> collectUidFromExpPlan(LogicalExpressionPlan filterPlan) throws FrontendException {
Set<Long> uids = new HashSet<Long>();
Iterator<Operator> iter = filterPlan.getOperators();
while(iter.hasNext()) {
Operator op = iter.next();
if (op instanceof ProjectExpression) {
long uid = ((ProjectExpression)op).getFieldSchema().uid;
uids.add(uid);
}
}
return uids;
}
/**
* Starting from current operator (which is a filter), search its successors until
* locating a non-filter operator. Null is returned if none is found.
*/
private Operator findNonFilterPredecessor(Operator current) {
Operator op = current;
do {
List<Operator> predecessors = currentPlan.getPredecessors( op );
// if there are no predecessors return false
if( predecessors == null || predecessors.size() == 0 ) {
return null;
}
Operator pred = predecessors.get( 0 );
if( pred instanceof LOFilter ) {
op = pred;
continue;
} else {
return pred;
}
} while( true );
}
@Override
public void transform(OperatorPlan matched) throws FrontendException {
subPlan = new OperatorSubPlan(currentPlan);
LOFilter filter = (LOFilter)matched.getSources().get(0);
// This is the one that we will insert filter btwn it and it's input.
Operator predecessor = this.findNonFilterPredecessor( filter );
subPlan.add( predecessor) ;
// Disconnect the filter in the plan without removing it from the plan.
Operator predec = currentPlan.getPredecessors( filter ).get( 0 );
Operator succed;
if (currentPlan.getSuccessors(filter)!=null)
succed = currentPlan.getSuccessors(filter).get(0);
else
succed = null;
Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter);
if (succed!=null) {
subPlan.add(succed);
Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
currentPlan.connect(predec, p1.first, succed, p2.second);
}
if( predecessor instanceof LOSort || predecessor instanceof LODistinct ||
( predecessor instanceof LOCogroup && currentPlan.getPredecessors( predecessor ).size() == 1 ) ) {
// For sort, put the filter in front of it.
Operator prev = currentPlan.getPredecessors( predecessor ).get( 0 );
insertFilter( prev, predecessor, filter );
return;
}
// Find the predecessor of join that contains all required uids.
LogicalExpressionPlan filterPlan = filter.getFilterPlan();
List<Operator> preds = currentPlan.getPredecessors( predecessor );
Map<Integer, Operator> inputs = findInputsToAddFilter( filterPlan, predecessor, preds );
LOFilter newFilter = null;
for( Entry<Integer, Operator> entry : inputs.entrySet() ) {
int inputIndex = entry.getKey();
Operator pred = entry.getValue();
// Find projection field offset
int columnOffset = 0;
if( predecessor instanceof LOJoin || predecessor instanceof LOCross ) {
for( int i = 0; i < inputIndex; i++ ) {
columnOffset += ( (LogicalRelationalOperator)preds.get( i ) ).getSchema().size();
}
}
// Reuse the filter for the first match. For others, need to make a copy of the filter
// and add it between input and predecessor.
newFilter = newFilter == null ? filter : new LOFilter( (LogicalPlan)currentPlan );
currentPlan.add( newFilter );
subPlan.add( newFilter );
subPlan.add( pred );
LogicalExpressionPlan fPlan = filterPlan.deepCopy();
List<Operator> sinks = fPlan.getSinks();
List<ProjectExpression> projExprs = new ArrayList<ProjectExpression>();
for( Operator sink : sinks ) {
if( sink instanceof ProjectExpression )
projExprs.add( (ProjectExpression)sink );
}
if( predecessor instanceof LOCogroup ) {
for( ProjectExpression projExpr : projExprs ) {
// Need to merge filter condition and cogroup by expression;
LogicalExpressionPlan plan = ((LOCogroup) predecessor).getExpressionPlans().get( inputIndex ).iterator().next();
LogicalExpressionPlan copy = plan.deepCopy();
LogicalExpression root = (LogicalExpression)copy.getSinks().get( 0 );
List<Operator> predecessors = fPlan.getPredecessors( projExpr );
if( predecessors == null || predecessors.size() == 0 ) {
fPlan.remove( projExpr );
fPlan.add( root );
} else {
fPlan.add( root );
Operator pred1 = predecessors.get( 0 );
Pair<Integer, Integer> pair = fPlan.disconnect( pred1, projExpr );
fPlan.connect( pred1, pair.first, root, pair.second );
fPlan.remove( projExpr );
}
}
}
// Now, reset the projection expressions in the new filter plan.
sinks = fPlan.getSinks();
for( Operator sink : sinks ) {
if( sink instanceof ProjectExpression ) {
ProjectExpression projE = (ProjectExpression)sink;
projE.setAttachedRelationalOp( newFilter );
projE.setInputNum( 0 );
projE.setColNum( projE.getColNum() - columnOffset );
}
}
newFilter.setFilterPlan( fPlan );
insertFilter( pred, predecessor, newFilter );
}
}
// check if a relational operator contains all of the specified uids
private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) throws FrontendException {
LogicalSchema schema = op.getSchema();
if (schema==null)
return false;
for(long uid: uids) {
if (schema.findField(uid) == -1) {
return false;
}
}
return true;
}
@Override
public OperatorPlan reportChanges() {
return currentPlan;
}
// Insert the filter in between the given two operators.
private void insertFilter(Operator prev, Operator predecessor, LOFilter filter)
throws FrontendException {
Pair<Integer, Integer> p3 = currentPlan.disconnect( prev, predecessor );
currentPlan.connect( prev, p3.first, filter, 0 );
currentPlan.connect( filter, 0, predecessor, p3.second );
}
// Identify those among preds that will need to have a filter between it and the predecessor.
private Map<Integer, Operator> findInputsToAddFilter(LogicalExpressionPlan filterPlan, Operator predecessor,
List<Operator> preds) throws FrontendException {
Map<Integer, Operator> inputs = new HashMap<Integer, Operator>();
if( predecessor instanceof LOUnion || predecessor instanceof LOCogroup ) {
for( int i = 0; i < preds.size(); i++ ) {
inputs.put( i, preds.get( i ) );
}
return inputs;
}
// collect all uids used in the filter plan
Set<Long> uids = collectUidFromExpPlan(filterPlan);
boolean[] innerFlags = null;
boolean isInner = true;
if (predecessor instanceof LOJoin) {
innerFlags = ((LOJoin)predecessor).getInnerFlags();
for (boolean inner : innerFlags) {
if (!inner) {
isInner = false;
break;
}
}
}
// Find the predecessor of join that contains all required uids.
for(int j=0; j<preds.size(); j++) {
// Filter can push to LOJoin outer branch, but no inner branch
if( hasAll((LogicalRelationalOperator)preds.get(j), uids) &&
(predecessor instanceof LOCross || predecessor instanceof LOJoin && (isInner || innerFlags[j]))) {
Operator input = preds.get(j);
subPlan.add(input);
inputs.put( j, input );
}
}
return inputs;
}
}
@Override
protected OperatorPlan buildPattern() {
LogicalPlan plan = new LogicalPlan();
LogicalRelationalOperator op1 = new LOFilter(plan);
plan.add( op1 );
return plan;
}
}