blob: be6eb5b886e800c52e0d955eeb55ed3704f61efc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Utils;
public class LimitAdjuster extends MROpPlanVisitor {
ArrayList<MapReduceOper> opsToAdjust = new ArrayList<MapReduceOper>();
PigContext pigContext;
NodeIdGenerator nig;
private String scope;
public LimitAdjuster(MROperPlan plan, PigContext pigContext) {
super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
this.pigContext = pigContext;
nig = NodeIdGenerator.getGenerator();
List<MapReduceOper> roots = plan.getRoots();
scope = roots.get(0).getOperatorKey().getScope();
}
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
// Look for map reduce operators which contains limit operator.
// If so, add one additional map-reduce
// operator with 1 reducer into the original plan.
// TODO: This new MR job can be skipped if at runtime we discover that
// its parent only has a single reducer (mr.requestedParallelism!=1).
// This check MUST happen at runtime since that's when reducer estimation happens.
if ((mr.limit!=-1 || mr.limitPlan!=null) )
{
opsToAdjust.add(mr);
}
}
public void adjust() throws IOException, PlanException
{
for (MapReduceOper mr:opsToAdjust)
{
if (mr.reducePlan.isEmpty()) continue;
List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves();
if (mpLeaves.size() != 1) {
int errCode = 2024;
String msg = "Expected reduce to have single leaf. Found " + mpLeaves.size() + " leaves.";
throw new MRCompilerException(msg, errCode, PigException.BUG);
}
PhysicalOperator mpLeaf = mpLeaves.get(0);
if (!pigContext.inIllustrator) {
if (!(mpLeaf instanceof POStore)) {
int errCode = 2025;
String msg = "Expected leaf of reduce plan to " +
"always be POStore. Found " + mpLeaf.getClass().getSimpleName();
throw new MRCompilerException(msg, errCode, PigException.BUG);
}
}
FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
boolean oldIsTmpStore = ((POStore)mpLeaf).isTmpStore();
FileSpec fSpec = new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
POStore storeOp = (POStore) mpLeaf;
storeOp.setSFile(fSpec);
storeOp.setIsTmpStore(true);
mr.setReduceDone(true);
MapReduceOper limitAdjustMROp = new MapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)));
POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
ld.setPc(pigContext);
ld.setLFile(fSpec);
ld.setIsTmpLoad(true);
limitAdjustMROp.mapPlan.add(ld);
if (mr.isGlobalSort()) {
connectMapToReduceLimitedSort(limitAdjustMROp, mr);
} else {
MRUtil.simpleConnectMapToReduce(limitAdjustMROp, scope, nig);
}
// Need to split the original reduce plan into two mapreduce job:
// 1st: From the root(POPackage) to POLimit
// 2nd: From POLimit to leaves(POStore), duplicate POLimit
// The reason for doing that:
// 1. We need to have two map-reduce job, otherwise, we will end up with
// N*M records, N is number of reducer, M is limit constant. We need
// one extra mapreduce job with 1 reducer
// 2. We don't want to move operator after POLimit into the first mapreduce
// job, because:
// * Foreach will shift the key type for second mapreduce job, see PIG-461
// * Foreach flatten may generating more than M records, which get cut
// by POLimit, see PIG-2231
splitReducerForLimit(limitAdjustMROp, mr);
if (mr.isGlobalSort())
{
limitAdjustMROp.setLimitAfterSort(true);
limitAdjustMROp.setSortOrder(mr.getSortOrder());
}
POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
st.setSFile(oldSpec);
st.setIsTmpStore(oldIsTmpStore);
st.setSchema(((POStore)mpLeaf).getSchema());
st.setSignature(((POStore)mpLeaf).getSignature());
st.copyAliasFrom(mpLeaf);
limitAdjustMROp.reducePlan.addAsLeaf(st);
limitAdjustMROp.requestedParallelism = 1;
limitAdjustMROp.setLimitOnly(true);
List<MapReduceOper> successorList = mPlan.getSuccessors(mr);
MapReduceOper successors[] = null;
// Save a snapshot for successors, since we will modify MRPlan,
// use the list directly will be problematic
if (successorList!=null && successorList.size()>0)
{
successors = new MapReduceOper[successorList.size()];
int i=0;
for (MapReduceOper op:successorList)
successors[i++] = op;
}
// Process UDFs
for (String udf : mr.UDFs) {
if (!limitAdjustMROp.UDFs.contains(udf)) {
limitAdjustMROp.UDFs.add(udf);
}
}
mPlan.add(limitAdjustMROp);
mPlan.connect(mr, limitAdjustMROp);
if (successors!=null)
{
for (int i=0;i<successors.length;i++)
{
MapReduceOper nextMr = successors[i];
if (nextMr!=null)
mPlan.disconnect(mr, nextMr);
if (nextMr!=null)
mPlan.connect(limitAdjustMROp, nextMr);
}
}
}
}
// Move all operators between POLimit and POStore in reducer plan
// from firstMROp to the secondMROp
private void splitReducerForLimit(MapReduceOper secondMROp,
MapReduceOper firstMROp) throws PlanException, VisitorException {
PhysicalOperator op = firstMROp.reducePlan.getRoots().get(0);
assert(op instanceof POPackage);
while (true) {
List<PhysicalOperator> succs = firstMROp.reducePlan
.getSuccessors(op);
if (succs==null) break;
op = succs.get(0);
if (op instanceof POLimit) {
// find operator after POLimit
op = firstMROp.reducePlan.getSuccessors(op).get(0);
break;
}
}
POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit2.setLimit(firstMROp.limit);
pLimit2.setLimitPlan(firstMROp.limitPlan);
secondMROp.reducePlan.addAsLeaf(pLimit2);
while (true) {
if (op instanceof POStore) break;
PhysicalOperator opToMove = op;
List<PhysicalOperator> succs = firstMROp.reducePlan
.getSuccessors(op);
op = succs.get(0);
firstMROp.reducePlan.removeAndReconnect(opToMove);
secondMROp.reducePlan.addAsLeaf(opToMove);
}
}
private void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException
{
POLocalRearrange slr = (POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
POLocalRearrange lr = null;
try {
lr = slr.clone();
} catch (CloneNotSupportedException e) {
int errCode = 2147;
String msg = "Error cloning POLocalRearrange for limit after sort";
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
mro.mapPlan.addAsLeaf(lr);
POPackage spkg = (POPackage)sortMROp.reducePlan.getRoots().get(0);
POPackage pkg = null;
try {
pkg = spkg.clone();
} catch (Exception e) {
int errCode = 2148;
String msg = "Error cloning POPackageLite for limit after sort";
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
mro.reducePlan.add(pkg);
mro.reducePlan.addAsLeaf(MRUtil.getPlainForEachOP(scope, nig));
}
}