blob: c32f9b3ad747b02446196d371fa9a632c8356122 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.rules;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.optimizer.Transformer;
public class LimitOptimizer extends Rule {
public LimitOptimizer(String name) {
super(name, false);
}
@Override
protected OperatorPlan buildPattern() {
LogicalPlan plan = new LogicalPlan();
LogicalRelationalOperator limit = new LOLimit(plan, 0);
plan.add(limit);
return plan;
}
@Override
public Transformer getNewTransformer() {
return new OptimizeLimitTransformer();
}
public class OptimizeLimitTransformer extends Transformer {
@Override
public boolean check(OperatorPlan matched) {
LOLimit limit = (LOLimit) matched.getSources().get(0);
// Match each foreach.
List<Operator> preds = currentPlan.getPredecessors(limit);
if (preds == null || preds.size() == 0)
return false;
Operator pred = preds.get(0);
// Limit cannot be pushed up
if (pred instanceof LOCogroup || pred instanceof LOFilter
|| pred instanceof LOSplit || pred instanceof LODistinct || pred instanceof LOJoin) {
return false;
}
// Limit cannot be pushed in front of ForEach if it has a flatten
if (pred instanceof LOForEach) {
LOForEach foreach = (LOForEach) pred;
LogicalPlan innerPlan = foreach.getInnerPlan();
Iterator<Operator> it = innerPlan.getOperators();
while (it.hasNext()) {
Operator op = it.next();
if (op instanceof LOGenerate) {
LOGenerate gen = (LOGenerate) op;
boolean[] flattenFlags = gen.getFlattenFlags();
if (flattenFlags != null) {
for (boolean flatten : flattenFlags) {
if (flatten)
return false;
}
}
}
}
}
return true;
}
@Override
public OperatorPlan reportChanges() {
return currentPlan;
}
@Override
public void transform(OperatorPlan matched) throws FrontendException {
LOLimit limit = (LOLimit) matched.getSources().get(0);
// Find the next foreach operator.
List<Operator> preds = currentPlan.getPredecessors(limit);
Operator pred = preds.get(0);
if (pred instanceof LOForEach) {
// We can safely move LOLimit up
// Get operator before LOForEach
Operator prepredecessor = currentPlan.getPredecessors(pred)
.get(0);
List<Operator> softPrepredecessors=null;
// get a clone of softPrepredecessors to avoid ConcurrentModificationException
if (currentPlan.getSoftLinkPredecessors(limit)!=null) {
softPrepredecessors=new ArrayList<Operator>(
currentPlan.getSoftLinkPredecessors(limit));
}
if (softPrepredecessors!=null) {
for (Operator op : softPrepredecessors) {
currentPlan.removeSoftLink(op, limit);
}
}
currentPlan.removeAndReconnect(limit);
currentPlan.insertBetween(prepredecessor, limit, pred);
if (softPrepredecessors!=null) {
for (Operator op : softPrepredecessors) {
currentPlan.createSoftLink(op, limit);
}
}
} else if (limit.getLimitPlan() == null) {
// TODO selectively enable optimizations for variable limit
if (pred instanceof LOCross || pred instanceof LOUnion) {
// Limit can be duplicated, and the new instance pushed in front
// of an operator for the following operators
// (that is, if you have X->limit, you can transform that to
// limit->X->limit):
LOLimit newLimit = null;
List<Operator> nodesToProcess = new ArrayList<Operator>();
for (Operator prepredecessor : currentPlan
.getPredecessors(pred))
nodesToProcess.add(prepredecessor);
for (Operator prepredecessor : nodesToProcess) {
if (prepredecessor instanceof LOLimit) {
LOLimit l = (LOLimit) prepredecessor;
l.setLimit(l.getLimit() < limit.getLimit() ? l
.getLimit() : limit.getLimit());
} else {
newLimit = new LOLimit((LogicalPlan) currentPlan, limit
.getLimit());
currentPlan.insertBetween(prepredecessor, newLimit, pred);
}
}
} else if (pred instanceof LOSort) {
LOSort sort = (LOSort) pred;
if (sort.getLimit() == -1)
sort.setLimit(limit.getLimit());
else
sort.setLimit(sort.getLimit() < limit.getLimit() ? sort
.getLimit() : limit.getLimit());
// remove the limit
currentPlan.removeAndReconnect(limit);
} else if (pred instanceof LOLoad) {
// Push limit to load
LOLoad load = (LOLoad) pred;
if (load.getLimit() == -1)
load.setLimit(limit.getLimit());
else
load.setLimit(load.getLimit() < limit.getLimit() ? load
.getLimit() : limit.getLimit());
} else if (pred instanceof LOLimit) {
// Limit is merged into another LOLimit
LOLimit beforeLimit = (LOLimit) pred;
beforeLimit
.setLimit(beforeLimit.getLimit() < limit.getLimit() ? beforeLimit
.getLimit()
: limit.getLimit());
// remove the limit
currentPlan.removeAndReconnect(limit);
} else if (pred instanceof LOSplitOutput) {
// Limit and OrderBy (LOSort) can be separated by split
List<Operator> grandparants = currentPlan.getPredecessors(pred);
// After insertion of splitters, any node in the plan can
// have at most one predecessor
if (grandparants != null && grandparants.size() != 0
&& grandparants.get(0) instanceof LOSplit) {
List<Operator> greatGrandparants = currentPlan
.getPredecessors(grandparants.get(0));
if (greatGrandparants != null
&& greatGrandparants.size() != 0
&& greatGrandparants.get(0) instanceof LOSort) {
LOSort sort = (LOSort) greatGrandparants.get(0);
LOSort newSort = LOSort.createCopy(sort);
newSort.setLimit(limit.getLimit());
currentPlan.replace(limit, newSort);
}
}
}
}
}
}
}