blob: bf0a8377ae04e71fdbf74a3ec1633c6a3fb96aa2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.pig.backend.hadoop.executionengine.tez.plan;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
import org.apache.pig.impl.util.Pair;
* A port of the POPackageAnnotator from MR to Tez.
public class TezPOPackageAnnotator extends TezOpPlanVisitor {
* @param plan Tez plan to visit
public TezPOPackageAnnotator(TezOperPlan plan) {
super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
public void visitTezOp(TezOperator tezOp) throws VisitorException {
if(!tezOp.plan.isEmpty()) {
PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(tezOp.plan);
POPackage pkg = pkgDiscoverer.getPkg();
if(pkg != null) {
handlePackage(tezOp, pkg);
private void handlePackage(TezOperator pkgTezOp, POPackage pkg) throws VisitorException {
// the LocalRearrange(s) must be in the plan of a predecessor tez op
int lrFound = 0;
List<TezOperator> preds = this.mPlan.getPredecessors(pkgTezOp);
for (Iterator<TezOperator> it = preds.iterator(); it.hasNext();) {
TezOperator predTezOp =;
TezOperator predTezOpVertexGrp = null;
if (predTezOp.isVertexGroup()) {
predTezOpVertexGrp = predTezOp;
// Just get one of the inputs to vertex group
predTezOp = getPlan().getOperator(predTezOp.getVertexGroupMembers().get(0));
lrFound += patchPackage(predTezOp, predTezOpVertexGrp, pkgTezOp, pkg);
if(lrFound == pkg.getNumInps()) {
if(lrFound != pkg.getNumInps()) {
int errCode = 2086;
String msg = "Unexpected problem during optimization. "
+ "Could not find all LocalRearrange operators. Expected: "
+ pkg.getNumInps() + ", Found: " + lrFound;
throw new OptimizerException(msg, errCode, PigException.BUG);
private int patchPackage(TezOperator predTezOp,
TezOperator predTezOpVertexGrp,
TezOperator pkgTezOp,
POPackage pkg) throws VisitorException {
LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
predTezOp.plan, predTezOpVertexGrp, pkgTezOp, pkg);
// let our caller know if we managed to patch
// the package
return lrDiscoverer.getLoRearrangeFound();
* Simple visitor of the "Reduce" physical plan
* which will get a reference to the POPacakge
* present in the plan
static class PackageDiscoverer extends PhyPlanVisitor {
private POPackage pkg;
public PackageDiscoverer(PhysicalPlan plan) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
public void visitPackage(POPackage pkg) throws VisitorException {
this.pkg = pkg;
* @return the pkg
public POPackage getPkg() {
return pkg;
* Physical Plan visitor which tries to get the
* LocalRearrange(s) present in the plan (if any) and
* annotate the POPackage given to it with the information
* in the LocalRearrange (regarding columns in the "value"
* present in the "key")
public static class LoRearrangeDiscoverer extends PhyPlanVisitor {
private int loRearrangeFound = 0;
private TezOperator pkgTezOp;
private POPackage pkg;
private TezOperator predTezOpVertexGrp;
private boolean isPOSplit;
public LoRearrangeDiscoverer(PhysicalPlan predPlan, TezOperator predTezOpVertexGrp, TezOperator pkgTezOp, POPackage pkg) {
super(predPlan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(predPlan));
this.pkgTezOp = pkgTezOp;
this.pkg = pkg;
this.predTezOpVertexGrp = predTezOpVertexGrp;
public void visitSplit(POSplit spl) throws VisitorException {
isPOSplit = true;
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) {
Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
if (pkg.getPkgr() instanceof LitePackager) {
if(lrearrange.getIndex() != 0) {
// Throw some exception here
throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
keyInfo = pkg.getPkgr().getKeyInfo();
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
Integer index = Integer.valueOf(lrearrange.getIndex());
if(keyInfo.get(index) != null) {
if (isPOSplit) {
// Case of POSplit having more than one input in case of self join or union
} else {
// something is wrong - we should not be getting key info
// for the same index from two different Local Rearranges
int errCode = 2087;
String msg = "Unexpected problem during optimization." +
" Found index:" + lrearrange.getIndex() +
" in multiple LocalRearrange operators.";
throw new OptimizerException(msg, errCode, PigException.BUG);
new Pair<Boolean, Map<Integer, Integer>>(
lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
* @return the loRearrangeFound
public int getLoRearrangeFound() {
return loRearrangeFound;