blob: d76e21b17c26f6444c7a82ea81d86c31649ceeaa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
import org.apache.pig.impl.util.Pair;
/**
* This visitor visits the MRPlan and does the following
* for each MROper
* - visits the POPackage in the reduce plan and finds the corresponding
* POLocalRearrange(s) (either in the map plan of the same oper OR
* reduce plan of predecessor MROper). It then annotates the POPackage
* with information about which columns in the "value" are present in the
* "key" and will need to stitched in to the "value"
*/
public class POPackageAnnotator extends MROpPlanVisitor {
/**
* @param plan MR plan to visit
*/
public POPackageAnnotator(MROperPlan plan) {
super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
}
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
// POPackage OR POJoinPackage could be present in the combine plan
// OR in the reduce plan. POPostCombinerPackage could
// be present only in the reduce plan. Search in these two
// plans accordingly
if(!mr.combinePlan.isEmpty()) {
PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.combinePlan);
pkgDiscoverer.visit();
POPackage pkg = pkgDiscoverer.getPkg();
if(pkg != null) {
handlePackage(mr, pkg);
}
}
if(!mr.reducePlan.isEmpty()) {
PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.reducePlan);
pkgDiscoverer.visit();
POPackage pkg = pkgDiscoverer.getPkg();
if(pkg != null) {
// if the POPackage is actually a POPostCombinerPackage, then we should
// just look for the corresponding LocalRearrange(s) in the combine plan
if (pkg.getPkgr() instanceof CombinerPackager) {
if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) {
int errCode = 2085;
String msg = "Unexpected problem during optimization." +
" Could not find LocalRearrange in combine plan.";
throw new OptimizerException(msg, errCode, PigException.BUG);
}
} else {
handlePackage(mr, pkg);
}
}
}
}
private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException {
// the LocalRearrange(s) could either be in the map of this MapReduceOper
// OR in the reduce of predecessor MapReduceOpers
int lrFound = 0;
lrFound = patchPackage(mr.mapPlan, pkg);
if(lrFound != pkg.getNumInps()) {
// we did not find the LocalRearrange(s) in the map plan
// let's look in the predecessors
List<MapReduceOper> preds = this.mPlan.getPredecessors(mr);
for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) {
MapReduceOper mrOper = it.next();
if (mrOper.isLimitOnly() && !mPlan.getPredecessors(mrOper).get(0).isGlobalSort())
mrOper = this.mPlan.getPredecessors(mrOper).get(0);
lrFound += patchPackage(mrOper.reducePlan, pkg);
if(lrFound == pkg.getNumInps()) {
break;
}
}
}
if(lrFound != pkg.getNumInps()) {
int errCode = 2086;
String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators.";
throw new OptimizerException(msg, errCode, PigException.BUG);
}
}
private int patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException {
LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(plan, pkg);
lrDiscoverer.visit();
// let our caller know if we managed to patch
// the package
return lrDiscoverer.getLoRearrangeFound();
}
/**
* Simple visitor of the "Reduce" physical plan
* which will get a reference to the POPacakge
* present in the plan
*/
static class PackageDiscoverer extends PhyPlanVisitor {
private POPackage pkg;
public PackageDiscoverer(PhysicalPlan plan) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
}
@Override
public void visitPackage(POPackage pkg) throws VisitorException {
this.pkg = pkg;
};
/**
* @return the pkg
*/
public POPackage getPkg() {
return pkg;
}
}
/**
* Physical Plan visitor which tries to get the
* LocalRearrange(s) present in the plan (if any) and
* annotate the POPackage given to it with the information
* in the LocalRearrange (regarding columns in the "value"
* present in the "key")
*/
static class LoRearrangeDiscoverer extends PhyPlanVisitor {
private int loRearrangeFound = 0;
private POPackage pkg;
public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.pkg = pkg;
}
@Override
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
loRearrangeFound++;
Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
if (pkg.getPkgr() instanceof LitePackager) {
if(lrearrange.getIndex() != 0) {
// Throw some exception here
throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
}
}
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
keyInfo = pkg.getPkgr().getKeyInfo();
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
// something is wrong - we should not be getting key info
// for the same index from two different Local Rearranges
int errCode = 2087;
String msg = "Unexpected problem during optimization." +
" Found index:" + lrearrange.getIndex() +
" in multiple LocalRearrange operators.";
throw new OptimizerException(msg, errCode, PigException.BUG);
}
keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
new Pair<Boolean, Map<Integer, Integer>>(
lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
pkg.getPkgr().setKeyInfo(keyInfo);
pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
}
/**
* @return the loRearrangeFound
*/
public int getLoRearrangeFound() {
return loRearrangeFound;
}
}
}