blob: a44a42ef0e5a4cc2077470438bbd3ca7a9d101bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.PigContext;
/**
* A visitor to optimize plans that have a sample job that immediately follows a
* load/store only MR job. These kinds of plans are generated for order bys, and
* will soon be generated for joins that need to sample their data first. These
* can be changed so that the RandomSampleLoader subsumes the loader used in the
* first job and then removes the first job.
*/
public class SampleOptimizer extends MROpPlanVisitor {
private static final Log log = LogFactory.getLog(SampleOptimizer.class);
private PigContext pigContext;
public SampleOptimizer(MROperPlan plan, PigContext pigContext) {
super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
this.pigContext = pigContext;
}
private List<MapReduceOper> opsToRemove = new ArrayList<MapReduceOper>();
@Override
public void visit() throws VisitorException {
super.visit();
// remove operators marked for removal
for (MapReduceOper op : opsToRemove)
this.mPlan.remove(op);
}
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
// See if this is a sampling job.
List<PhysicalOperator> pos = mr.mapPlan.getRoots();
if (pos == null || pos.size() == 0) {
log.debug("Map of operator empty");
return;
}
PhysicalOperator po = pos.get(0);
if (!(po instanceof POLoad)) {
log.debug("Root operator of map is not load.");
return; // Huh?
}
POLoad load = (POLoad)po;
String loadFunc = load.getLFile().getFuncName();
String loadFile = load.getLFile().getFileName();
if (!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc)) && !("org.apache.pig.impl.builtin.PoissonSampleLoader".equals(loadFunc))) {
log.debug("Not a sampling job.");
return;
}
if (loadFile == null) {
log.debug("No load file");
return;
}
// Get this job's predecessor. There should be exactly one.;
List<MapReduceOper> preds = mPlan.getPredecessors(mr);
if (preds.size() != 1) {
log.debug("Too many predecessors to sampling job.");
return;
}
MapReduceOper pred = preds.get(0);
// The predecessor should be a root.
List<MapReduceOper> predPreds = mPlan.getPredecessors(pred);
if (predPreds != null && predPreds.size() > 0) {
log.debug("Predecessor should be a root of the plan");
return;
}
// The predecessor should have just a load and store in the map, and nothing
// in the combine or reduce.
if ( !(pred.reducePlan.isEmpty() && pred.combinePlan.isEmpty())) {
log.debug("Predecessor has a combine or reduce plan");
return;
}
// The MR job should have one successor.
List<MapReduceOper> succs = mPlan.getSuccessors(mr);
if (succs.size() != 1) {
log.debug("Job has more than one successor.");
return;
}
MapReduceOper succ = succs.get(0);
if (pred.mapPlan == null || pred.mapPlan.size() != 2) {
log.debug("Predecessor has more than just load+store in the map");
return;
}
List<PhysicalOperator> loads = pred.mapPlan.getRoots();
if (loads.size() != 1) {
log.debug("Predecessor plan has more than one root.");
return;
}
PhysicalOperator r = loads.get(0);
if (!(r instanceof POLoad)) { // Huh?
log.debug("Predecessor's map plan root is not a load.");
return;
}
POLoad predLoad = (POLoad)r;
// Find the load the correlates with the file the sampler is loading, and
// check that it is using the temp file storage format.
if (succ.mapPlan == null) { // Huh?
log.debug("Successor has no map plan.");
return;
}
loads = succ.mapPlan.getRoots();
POLoad succLoad = null;
for (PhysicalOperator root : loads) {
if (!(root instanceof POLoad)) { // Huh?
log.debug("Successor's roots are not loads");
return;
}
POLoad sl = (POLoad)root;
if (loadFile.equals(sl.getLFile().getFileName()) &&
Utils.getTmpFileCompressorName(pigContext).equals(sl.getLFile().getFuncName())) {
succLoad = sl;
break;
}
}
if (succLoad == null) {
log.debug("Could not find load that matched file we are sampling.");
return;
}
// Okay, we're on.
// First, replace our RandomSampleLoader with a RandomSampleLoader that uses
// the load function from our predecessor.
String[] rslargs = new String[2];
FileSpec predFs = predLoad.getLFile();
// First argument is FuncSpec of loader function to subsume, this we want to set for
// ourselves.
rslargs[0] = predFs.getFuncSpec().toString();
// Add the loader's funcspec to the list of udf's associated with this mr operator
mr.UDFs.add(rslargs[0]);
// Second argument is the number of samples per block, read this from the original.
rslargs[1] = load.getLFile().getFuncSpec().getCtorArgs()[1];
FileSpec fs = new FileSpec(predFs.getFileName(),new FuncSpec(loadFunc, rslargs));
POLoad newLoad = new POLoad(load.getOperatorKey(),load.getRequestedParallelism(), fs);
newLoad.setSignature(predLoad.getSignature());
newLoad.setLimit(predLoad.getLimit());
try {
mr.mapPlan.replace(load, newLoad);
// check if it has PartitionSkewedKeys
List<PhysicalOperator> ls = mr.reducePlan.getLeaves();
for(PhysicalOperator op: ls) {
scan(mr, op, fs.getFileName());
}
} catch (PlanException e) {
throw new VisitorException(e);
}
// Second, replace the loader in our successor with whatever the originally used loader was.
fs = new FileSpec(predFs.getFileName(), predFs.getFuncSpec());
newLoad = new POLoad(succLoad.getOperatorKey(), succLoad.getRequestedParallelism(), fs);
newLoad.setSignature(predLoad.getSignature());
try {
succ.mapPlan.replace(succLoad, newLoad);
// Add the loader's funcspec to the list of udf's associated with this mr operator
succ.UDFs.add(newLoad.getLFile().getFuncSpec().toString());
} catch (PlanException e) {
throw new VisitorException(e);
}
// Cannot delete the pred right now, because we are still traversing the graph. So, mark the pred and remove it from the
// plan once the visit by this optimizer is complete.
opsToRemove.add(pred);
}
// search for PartionSkewedKeys and update input file name
// it is always under a POForEach operator in reduce plan
private void scan(MapReduceOper mr, PhysicalOperator op, String fileName) {
if (op instanceof POUserFunc) {
if (((POUserFunc)op).getFuncSpec().getClassName().equals(
"org.apache.pig.impl.builtin.PartitionSkewedKeys")) {
String[] ctorArgs = ((POUserFunc)op).getFuncSpec().getCtorArgs();
ctorArgs[2] = fileName;
return;
}
}else if (op instanceof POForEach) {
List<PhysicalPlan> pl = ((POForEach)op).getInputPlans();
for(PhysicalPlan plan: pl) {
List<PhysicalOperator> list = plan.getLeaves();
for (PhysicalOperator pp: list) {
scan(mr, pp, fileName);
}
}
}else{
List<PhysicalOperator> preds = mr.reducePlan.getPredecessors(op);
if (preds == null) {
return;
}
for(PhysicalOperator p: preds) {
scan(mr, p, fileName);
}
}
}
}