blob: 7faa1fd207270d6cb5165479060765ba077f7a2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.relational;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.SchemaTupleFrontend;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.CompilerUtils;
import org.apache.pig.impl.util.LinkedMultiMap;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
import org.apache.pig.newplan.ReverseDependencyOrderWalkerWOSeenChk;
import org.apache.pig.newplan.SubtreeDependencyOrderWalker;
import org.apache.pig.newplan.logical.Util;
import org.apache.pig.newplan.logical.expression.ExpToPhyTranslationVisitor;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.parser.SourceLocation;
public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
private static final Log LOG = LogFactory.getLog(LogToPhyTranslationVisitor.class);
public LogToPhyTranslationVisitor(OperatorPlan plan) throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
currentPlan = new PhysicalPlan();
logToPhyMap = new HashMap<Operator, PhysicalOperator>();
currentPlans = new LinkedList<PhysicalPlan>();
}
protected Map<Operator, PhysicalOperator> logToPhyMap;
protected Deque<PhysicalPlan> currentPlans;
protected PhysicalPlan currentPlan;
protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
protected PigContext pc;
public void setPigContext(PigContext pc) {
this.pc = pc;
}
public Map<Operator, PhysicalOperator> getLogToPhyMap() {
return logToPhyMap;
}
public PhysicalPlan getPhysicalPlan() {
return currentPlan;
}
@Override
public void visit(LOLoad loLoad) throws FrontendException {
String scope = DEFAULT_SCOPE;
// The last parameter here is set to true as we assume all files are
// splittable due to LoadStore Refactor
POLoad load = new POLoad(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), loLoad.getLoadFunc());
load.addOriginalLocation(loLoad.getAlias(), loLoad.getLocation());
load.setLFile(loLoad.getFileSpec());
load.setPc(pc);
load.setResultType(DataType.BAG);
load.setSignature(loLoad.getSignature());
load.setLimit(loLoad.getLimit());
load.setIsTmpLoad(loLoad.isTmpLoad());
load.setCacheFiles(loLoad.getLoadFunc().getCacheFiles());
load.setShipFiles(loLoad.getLoadFunc().getShipFiles());
currentPlan.add(load);
logToPhyMap.put(loLoad, load);
// Load is typically a root operator, but in the multiquery
// case it might have a store as a predecessor.
List<Operator> op = loLoad.getPlan().getPredecessors(loLoad);
PhysicalOperator from;
if(op != null) {
from = logToPhyMap.get(op.get(0));
try {
currentPlan.connect(from, load);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
}
@Override
public void visit(LONative loNative) throws FrontendException{
String scope = DEFAULT_SCOPE;
PONative poNative = new PONative(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)));
poNative.addOriginalLocation(loNative.getAlias(), loNative.getLocation());
poNative.setNativeMRjar(loNative.getNativeMRJar());
poNative.setParams(loNative.getParams());
poNative.setResultType(DataType.BAG);
logToPhyMap.put(loNative, poNative);
currentPlan.add(poNative);
List<Operator> op = loNative.getPlan().getPredecessors(loNative);
PhysicalOperator from;
if(op != null) {
from = logToPhyMap.get(op.get(0));
} else {
int errCode = 2051;
String msg = "Did not find a predecessor for Native." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
try {
currentPlan.connect(from, poNative);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit(LOFilter filter) throws FrontendException {
String scope = DEFAULT_SCOPE;
// System.err.println("Entering Filter");
POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), filter.getRequestedParallelism());
poFilter.addOriginalLocation(filter.getAlias(), filter.getLocation());
poFilter.setResultType(DataType.BAG);
currentPlan.add(poFilter);
logToPhyMap.put(filter, poFilter);
currentPlans.push(currentPlan);
currentPlan = new PhysicalPlan();
// PlanWalker childWalker = currentWalker
// .spawnChildWalker(filter.getFilterPlan());
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(filter.getFilterPlan());
pushWalker(childWalker);
//currentWalker.walk(this);
currentWalker.walk(
new ExpToPhyTranslationVisitor( currentWalker.getPlan(),
childWalker, filter, currentPlan, logToPhyMap ) );
popWalker();
poFilter.setPlan(currentPlan);
currentPlan = currentPlans.pop();
List<Operator> op = filter.getPlan().getPredecessors(filter);
PhysicalOperator from;
if(op != null) {
from = logToPhyMap.get(op.get(0));
} else {
int errCode = 2051;
String msg = "Did not find a predecessor for Filter." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
try {
currentPlan.connect(from, poFilter);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
translateSoftLinks(filter);
// System.err.println("Exiting Filter");
}
@Override
public void visit(LOSort sort) throws FrontendException {
String scope = DEFAULT_SCOPE;
List<LogicalExpressionPlan> logPlans = sort.getSortColPlans();
List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(logPlans.size());
// convert all the logical expression plans to physical expression plans
currentPlans.push(currentPlan);
for (LogicalExpressionPlan plan : logPlans) {
currentPlan = new PhysicalPlan();
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(plan);
pushWalker(childWalker);
childWalker.walk(new ExpToPhyTranslationVisitor( currentWalker.getPlan(),
childWalker, sort, currentPlan, logToPhyMap));
sortPlans.add(currentPlan);
popWalker();
}
currentPlan = currentPlans.pop();
// get the physical operator for sort
POSort poSort;
if (sort.getUserFunc() == null) {
poSort = new POSort(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), sort.getRequestedParallelism(), null,
sortPlans, sort.getAscendingCols(), null);
} else {
POUserComparisonFunc comparator = new POUserComparisonFunc(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), sort
.getRequestedParallelism(), null, sort.getUserFunc());
poSort = new POSort(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), sort.getRequestedParallelism(), null,
sortPlans, sort.getAscendingCols(), comparator);
}
poSort.addOriginalLocation(sort.getAlias(), sort.getLocation());
poSort.setLimit(sort.getLimit());
// sort.setRequestedParallelism(s.getType());
logToPhyMap.put(sort, poSort);
currentPlan.add(poSort);
List<Operator> op = sort.getPlan().getPredecessors(sort);
PhysicalOperator from;
if(op != null) {
from = logToPhyMap.get(op.get(0));
} else {
int errCode = 2051;
String msg = "Did not find a predecessor for Sort." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
try {
currentPlan.connect(from, poSort);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
poSort.setResultType(DataType.BAG);
}
/**
* Transformation from Logical to Physical Plan involves the following steps:
* First, it is generated a random number which will link a POCounter within a PORank.
* On this way, avoiding possible collisions on parallel rank operations.
* Then, if it is row number mode:
* <pre>
* In case of a RANK operation (row number mode), are used two steps:
* 1.- Each tuple is counted sequentially on each mapper, and are produced global counters
* 2.- Global counters are gathered and summed, each tuple calls to the respective counter value
* in order to calculate the corresponding rank value.
* </pre>
* or not:
* <pre>
* In case of a RANK BY operation, then are necessary five steps:
* 1.- Group by the fields involved on the rank operation: POPackage
* 2.- In case of multi-fields, the key (group field) is flatten: POForEach
* 3.- Sort operation by the fields available after flattening: POSort
* 4.- Each group is sequentially counted on each mapper through a global counter: POCounter
* 5.- Global counters are summed and passed to the rank operation: PORank
* </pre>
* @param loRank describe if the rank operation is on a row number mode
* or is rank by (dense or not)
**/
@Override
public void visit(LORank loRank) throws FrontendException {
String scope = DEFAULT_SCOPE;
PORank poRank;
POCounter poCounter;
Random randomGenerator = new Random();
Long operationID = Math.abs(randomGenerator.nextLong());
try {
// Physical operations for RANK operator:
// In case of a RANK BY operation, then are necessary five steps:
// 1.- Group by the fields involved on the rank operation: POPackage
// 2.- In case of multi-fields, the key (group field) is flatten: POForEach
// 3.- Sort operation by the fields available after flattening: POSort
// 4.- Each group is sequentially counted on each mapper through a global counter: POCounter
// 5.- Global counters are summed and passed to the rank operation: PORank
if(!loRank.isRowNumber()) {
boolean[] flags = {false};
MultiMap<Integer, LogicalExpressionPlan> expressionPlans = new MultiMap<Integer, LogicalExpressionPlan>();
for(int i = 0 ; i < loRank.getRankColPlans().size() ; i++)
expressionPlans.put(i,loRank.getRankColPlans());
POPackage poPackage = compileToLR_GR_PackTrio(loRank, null, flags, expressionPlans);
poPackage.getPkgr().setPackageType(PackageType.GROUP);
translateSoftLinks(loRank);
List<Boolean> flattenLst = Arrays.asList(true, false);
PhysicalPlan fep1 = new PhysicalPlan();
POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
feproj1.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
feproj1.setColumn(0);
feproj1.setResultType(poPackage.getPkgr().getKeyType());
feproj1.setStar(false);
feproj1.setOverloaded(false);
fep1.add(feproj1);
PhysicalPlan fep2 = new PhysicalPlan();
POProject feproj2 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
feproj2.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
feproj2.setColumn(1);
feproj2.setResultType(DataType.BAG);
feproj2.setStar(false);
feproj2.setOverloaded(false);
fep2.add(feproj2);
List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
POForEach poForEach = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1, fePlans, flattenLst);
List<LogicalExpressionPlan> rankPlans = loRank.getRankColPlans();
byte[] newTypes = new byte[rankPlans.size()];
for(int i = 0; i < rankPlans.size(); i++) {
LogicalExpressionPlan loep = rankPlans.get(i);
Iterator<Operator> inpOpers = loep.getOperators();
while(inpOpers.hasNext()) {
Operator oper = inpOpers.next();
newTypes[i] = ((ProjectExpression) oper).getType();
}
}
List<PhysicalPlan> newPhysicalPlan = new ArrayList<PhysicalPlan>();
List<Boolean> newOrderPlan = new ArrayList<Boolean>();
for(int i = 0; i < loRank.getRankColPlans().size(); i++) {
PhysicalPlan fep3 = new PhysicalPlan();
POProject feproj3 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
feproj3.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
feproj3.setColumn(i);
feproj3.setResultType(newTypes[i]);
feproj3.setStar(false);
feproj3.setOverloaded(false);
fep3.add(feproj3);
newPhysicalPlan.add(fep3);
newOrderPlan.add(loRank.getAscendingCol().get(i));
}
POSort poSort;
poSort = new POSort(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), -1, null,
newPhysicalPlan, newOrderPlan, null);
//poSort.setRequestedParallelism(loRank.getRequestedParallelism());
poSort.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
poCounter = new POCounter(
new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), -1 , null,
newPhysicalPlan, newOrderPlan);
poCounter.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
poCounter.setResultType(DataType.TUPLE);
poCounter.setIsRowNumber(loRank.isRowNumber());
poCounter.setIsDenseRank(loRank.isDenseRank());
poCounter.setOperationID(String.valueOf(operationID));
poRank = new PORank(
new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), -1 , null,
newPhysicalPlan, newOrderPlan);
poRank.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
poRank.setResultType(DataType.TUPLE);
poRank.setOperationID(String.valueOf(operationID));
List<Boolean> flattenLst2 = Arrays.asList(false, true);
PhysicalPlan fep12 = new PhysicalPlan();
POProject feproj12 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
feproj12.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
feproj12.setColumn(0);
feproj12.setResultType(DataType.LONG);
feproj12.setStar(false);
feproj12.setOverloaded(false);
fep12.add(feproj12);
PhysicalPlan fep22 = new PhysicalPlan();
POProject feproj22 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
feproj22.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
feproj22.setColumn(loRank.getRankColPlans().size()+1);
feproj22.setResultType(DataType.BAG);
feproj22.setStar(false);
feproj22.setOverloaded(false);
fep22.add(feproj22);
List<PhysicalPlan> fePlans2 = Arrays.asList(fep12, fep22);
POForEach poForEach2 = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1, fePlans2, flattenLst2);
currentPlan.add(poForEach);
currentPlan.add(poSort);
currentPlan.add(poCounter);
currentPlan.add(poRank);
currentPlan.add(poForEach2);
try {
currentPlan.connect(poPackage, poForEach);
currentPlan.connect(poForEach, poSort);
currentPlan.connect(poSort, poCounter);
currentPlan.connect(poCounter, poRank);
currentPlan.connect(poRank, poForEach2);
} catch (PlanException e) {
throw new LogicalToPhysicalTranslatorException(e.getMessage(),e.getErrorCode(),e.getErrorSource(),e);
}
logToPhyMap.put(loRank, poForEach2);
// In case of a RANK operation, are used two steps:
// 1.- Each tuple is counted sequentially on each mapper, and are produced global counters
// 2.- Global counters are gathered and summed, each tuple calls to the respective counter value
// in order to calculate the corresponding rank value.
} else {
List<LogicalExpressionPlan> logPlans = loRank.getRankColPlans();
List<PhysicalPlan> rankPlans = new ArrayList<PhysicalPlan>(logPlans.size());
// convert all the logical expression plans to physical expression plans
currentPlans.push(currentPlan);
for (LogicalExpressionPlan plan : logPlans) {
currentPlan = new PhysicalPlan();
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(plan);
pushWalker(childWalker);
childWalker.walk(new ExpToPhyTranslationVisitor( currentWalker.getPlan(),
childWalker, loRank, currentPlan, logToPhyMap));
rankPlans.add(currentPlan);
popWalker();
}
currentPlan = currentPlans.pop();
poCounter = new POCounter(
new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), -1 , null,
rankPlans, loRank.getAscendingCol());
poCounter.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
poCounter.setResultType(DataType.TUPLE);
poCounter.setIsRowNumber(loRank.isRowNumber());
poCounter.setIsDenseRank(loRank.isDenseRank());
poCounter.setOperationID(String.valueOf(operationID));
poRank = new PORank(
new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), -1 , null,
rankPlans, loRank.getAscendingCol());
poRank.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
poRank.setResultType(DataType.TUPLE);
poRank.setOperationID(String.valueOf(operationID));
currentPlan.add(poCounter);
currentPlan.add(poRank);
List<Operator> op = loRank.getPlan().getPredecessors(loRank);
PhysicalOperator from;
if(op != null) {
from = logToPhyMap.get(op.get(0));
} else {
int errCode = 2051;
String msg = "Did not find a predecessor for Rank." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
currentPlan.connect(from, poCounter);
currentPlan.connect(poCounter, poRank);
logToPhyMap.put(loRank, poRank);
}
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit(LOCross cross) throws FrontendException {
String scope = DEFAULT_SCOPE;
List<Operator> inputs = cross.getPlan().getPredecessors(cross);
if (cross.isNested()) {
POCross physOp = new POCross(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
physOp.addOriginalLocation(physOp.getAlias(), physOp.getOriginalLocations());
currentPlan.add(physOp);
physOp.setResultType(DataType.BAG);
logToPhyMap.put(cross, physOp);
for (Operator op : cross.getPlan().getPredecessors(cross)) {
PhysicalOperator from = logToPhyMap.get(op);
try {
currentPlan.connect(from, physOp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
} else {
POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), cross
.getRequestedParallelism());
poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), cross.getRequestedParallelism());
poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
poGlobal.setCross(true);
currentPlan.add(poGlobal);
currentPlan.add(poPackage);
int count = 0;
try {
currentPlan.connect(poGlobal, poPackage);
List<Boolean> flattenLst = Arrays.asList(true, true);
for (Operator op : inputs) {
PhysicalPlan fep1 = new PhysicalPlan();
ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
ce1.setValue(inputs.size());
ce1.setResultType(DataType.INTEGER);
fep1.add(ce1);
ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
ce2.setValue(count);
ce2.setResultType(DataType.INTEGER);
fep1.add(ce2);
/*Tuple ce1val = TupleFactory.getInstance().newTuple(2);
ce1val.set(0,inputs.size());
ce1val.set(1,count);
ce1.setValue(ce1val);
ce1.setResultType(DataType.TUPLE);*/
POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism(),
Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()
+ "('" + poGlobal.getOperatorKey().toString() + "')"));
gfc.addOriginalLocation(cross.getAlias(), cross.getLocation());
gfc.setResultType(DataType.BAG);
fep1.addAsLeaf(gfc);
gfc.setInputs(Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2));
/*fep1.add(gfc);
fep1.connect(ce1, gfc);
fep1.connect(ce2, gfc);*/
PhysicalPlan fep2 = new PhysicalPlan();
POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
feproj.addOriginalLocation(cross.getAlias(), cross.getLocation());
feproj.setResultType(DataType.TUPLE);
feproj.setStar(true);
feproj.setOverloaded(false);
fep2.add(feproj);
List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
fe.setMapSideOnly(true);
fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(fe);
currentPlan.connect(logToPhyMap.get(op), fe);
POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), cross
.getRequestedParallelism());
physOp.addOriginalLocation(cross.getAlias(), cross.getLocation());
List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>();
for(int i=0;i<inputs.size();i++){
PhysicalPlan lrp1 = new PhysicalPlan();
POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), i);
lrproj1.addOriginalLocation(cross.getAlias(), cross.getLocation());
lrproj1.setOverloaded(false);
lrproj1.setResultType(DataType.INTEGER);
lrp1.add(lrproj1);
lrPlans.add(lrp1);
}
physOp.setCross(true);
physOp.setIndex(count++);
physOp.setKeyType(DataType.TUPLE);
physOp.setPlans(lrPlans);
physOp.setResultType(DataType.TUPLE);
currentPlan.add(physOp);
currentPlan.connect(fe, physOp);
currentPlan.connect(physOp, poGlobal);
}
} catch (PlanException e1) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
} catch (ExecException e) {
int errCode = 2058;
String msg = "Unable to set index on newly create POLocalRearrange.";
throw new VisitorException(msg, errCode, PigException.BUG, e);
}
poPackage.getPkgr().setKeyType(DataType.TUPLE);
poPackage.setResultType(DataType.TUPLE);
poPackage.setNumInps(count);
boolean inner[] = new boolean[count];
for (int i=0;i<count;i++) {
inner[i] = true;
}
poPackage.getPkgr().setInner(inner);
List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
List<Boolean> flattenLst = new ArrayList<Boolean>();
for(int i=1;i<=count;i++){
PhysicalPlan fep1 = new PhysicalPlan();
POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), i);
feproj1.addOriginalLocation(cross.getAlias(), cross.getLocation());
feproj1.setResultType(DataType.BAG);
feproj1.setOverloaded(false);
fep1.add(feproj1);
fePlans.add(fep1);
flattenLst.add(true);
}
POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(fe);
try{
currentPlan.connect(poPackage, fe);
}catch (PlanException e1) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
}
logToPhyMap.put(cross, fe);
}
}
@Override
public void visit(LOStream stream) throws FrontendException {
String scope = DEFAULT_SCOPE;
POStream poStream = new POStream(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), stream.getExecutableManager(),
stream.getStreamingCommand(), this.pc.getProperties());
poStream.addOriginalLocation(stream.getAlias(), stream.getLocation());
currentPlan.add(poStream);
logToPhyMap.put(stream, poStream);
List<Operator> op = stream.getPlan().getPredecessors(stream);
PhysicalOperator from;
if(op != null) {
from = logToPhyMap.get(op.get(0));
} else {
int errCode = 2051;
String msg = "Did not find a predecessor for Stream." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
try {
currentPlan.connect(from, poStream);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit(LOInnerLoad load) throws FrontendException {
String scope = DEFAULT_SCOPE;
POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)));
LogicalSchema s = load.getSchema();
if (load.sourceIsBag()) {
exprOp.setResultType(DataType.BAG);
exprOp.setOverloaded(true);
}
else {
if (s!=null)
exprOp.setResultType(s.getField(0).type);
else
exprOp.setResultType(DataType.BYTEARRAY);
}
ProjectExpression proj = load.getProjection();
if(proj.isProjectStar()){
exprOp.setStar(proj.isProjectStar());
}
else if(proj.isRangeProject()){
if(proj.getEndCol() != -1){
//all other project-range should have been expanded by
// project-star expander
throw new AssertionError("project range that is not a " +
"project-to-end seen in translation to physical plan!");
}
exprOp.setProjectToEnd(proj.getStartCol());
}else {
exprOp.setColumn(load.getColNum());
}
// set input to POProject to the predecessor of foreach
logToPhyMap.put(load, exprOp);
currentPlan.add(exprOp);
}
@Override
public void visit(LOForEach foreach) throws FrontendException {
String scope = DEFAULT_SCOPE;
List<PhysicalPlan> innerPlans = new ArrayList<PhysicalPlan>();
org.apache.pig.newplan.logical.relational.LogicalPlan inner = foreach.getInnerPlan();
LOGenerate gen = (LOGenerate)inner.getSinks().get(0);
List<LogicalExpressionPlan> exps = gen.getOutputPlans();
List<Operator> preds = inner.getPredecessors(gen);
currentPlans.push(currentPlan);
// we need to translate each predecessor of LOGenerate into a physical plan.
// The physical plan should contain the expression plan for this predecessor plus
// the subtree starting with this predecessor
for (int i=0; i<exps.size(); i++) {
currentPlan = new PhysicalPlan();
// translate the expression plan
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(exps.get(i));
pushWalker(childWalker);
childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
childWalker, gen, currentPlan, logToPhyMap));
popWalker();
List<Operator> leaves = exps.get(i).getSinks();
for(Operator l: leaves) {
PhysicalOperator op = logToPhyMap.get(l);
if (l instanceof ProjectExpression ) {
int input = ((ProjectExpression)l).getInputNum();
// for each sink projection, get its input logical plan and translate it
Operator pred = preds.get(input);
childWalker = new SubtreeDependencyOrderWalker(inner, pred);
pushWalker(childWalker);
childWalker.walk(this);
popWalker();
// get the physical operator of the leaf of input logical plan
PhysicalOperator leaf = logToPhyMap.get(pred);
if (pred instanceof LOInnerLoad) {
// if predecessor is only an LOInnerLoad, remove the project that
// comes from LOInnerLoad and change the column of project that
// comes from expression plan
currentPlan.remove(leaf);
logToPhyMap.remove(pred);
POProject leafProj = (POProject)leaf;
try {
if(leafProj.isStar()){
((POProject)op).setStar(true);
}
else if(leafProj.isProjectToEnd()){
((POProject)op).setProjectToEnd(leafProj.getStartCol());
}else {
((POProject)op).setColumn(leafProj.getColumn() );
}
} catch (ExecException e) {
throw new FrontendException(foreach, "Cannot get column from "+leaf, 2230, e);
}
}else{
currentPlan.connect(leaf, op);
}
}
}
innerPlans.add(currentPlan);
}
currentPlan = currentPlans.pop();
// PhysicalOperator poGen = new POGenerate(new OperatorKey("",
// r.nextLong()), inputs, toBeFlattened);
boolean[] flatten = gen.getFlattenFlags();
List<Boolean> flattenList = new ArrayList<Boolean>();
for(boolean fl: flatten) {
flattenList.add(fl);
}
LogicalSchema logSchema = foreach.getSchema();
Schema schema = null;
if (logSchema != null) {
try {
schema = Schema.getPigSchema(new ResourceSchema(logSchema));
} catch (FrontendException e) {
throw new RuntimeException("LogicalSchema in foreach unable to be converted to Schema: " + logSchema, e);
}
}
if (schema != null) {
SchemaTupleFrontend.registerToGenerateIfPossible(schema, false, GenContext.FOREACH); //TODO may need to be appendable
}
POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), foreach.getRequestedParallelism(), innerPlans, flattenList, schema);
poFE.addOriginalLocation(foreach.getAlias(), foreach.getLocation());
poFE.setResultType(DataType.BAG);
logToPhyMap.put(foreach, poFE);
currentPlan.add(poFE);
// generate cannot have multiple inputs
List<Operator> op = foreach.getPlan().getPredecessors(foreach);
// generate may not have any predecessors
if (op == null)
return;
PhysicalOperator from = logToPhyMap.get(op.get(0));
try {
currentPlan.connect(from, poFE);
} catch (Exception e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
translateSoftLinks(foreach);
}
/**
* This function takes in a List of LogicalExpressionPlan and converts them to
* a list of PhysicalPlans
*
* @param plans
* @return
* @throws FrontendException
*/
private List<PhysicalPlan> translateExpressionPlans(LogicalRelationalOperator loj,
List<LogicalExpressionPlan> plans ) throws FrontendException {
List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
if( plans == null || plans.size() == 0 ) {
return exprPlans;
}
// Save the current plan onto stack
currentPlans.push(currentPlan);
for( LogicalExpressionPlan lp : plans ) {
currentPlan = new PhysicalPlan();
// We spawn a new Dependency Walker and use it
// PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(lp);
// Save the old walker and use childWalker as current Walker
pushWalker(childWalker);
// We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
currentWalker.walk(
new ExpToPhyTranslationVisitor(
currentWalker.getPlan(),
childWalker, loj, currentPlan, logToPhyMap) );
exprPlans.add(currentPlan);
popWalker();
}
// Pop the current plan back out
currentPlan = currentPlans.pop();
return exprPlans;
}
@Override
public void visit(LOStore loStore) throws FrontendException {
String scope = DEFAULT_SCOPE;
// System.err.println("Entering Store");
POStore store = new POStore(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)));
store.addOriginalLocation(loStore.getAlias(), loStore.getLocation());
store.setSFile(loStore.getOutputSpec());
store.setInputSpec(loStore.getInputSpec());
store.setSignature(loStore.getSignature());
store.setSortInfo(loStore.getSortInfo());
store.setIsTmpStore(loStore.isTmpStore());
store.setStoreFunc(loStore.getStoreFunc());
store.setSchema(Util.translateSchema( loStore.getSchema() ));
if (loStore.getStoreFunc() instanceof StoreResources) {
store.setCacheFiles(((StoreResources)loStore.getStoreFunc()).getCacheFiles());
store.setShipFiles(((StoreResources)loStore.getStoreFunc()).getShipFiles());
}
currentPlan.add(store);
List<Operator> op = loStore.getPlan().getPredecessors(loStore);
PhysicalOperator from = null;
if(op != null) {
from = logToPhyMap.get(op.get(0));
// TODO Implement sorting when we have a LOSort (new) and LOLimit (new) operator ready
// SortInfo sortInfo = null;
// // if store's predecessor is limit,
// // check limit's predecessor
// if(op.get(0) instanceof LOLimit) {
// op = loStore.getPlan().getPredecessors(op.get(0));
// }
// PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
// // if this predecessor is a sort, get
// // the sort info.
// if(op.get(0) instanceof LOSort) {
// sortInfo = ((POSort)sortPhyOp).getSortInfo();
// }
// store.setSortInfo(sortInfo);
// } else {
// int errCode = 2051;
// String msg = "Did not find a predecessor for Store." ;
// throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
try {
currentPlan.connect(from, store);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
logToPhyMap.put(loStore, store);
// System.err.println("Exiting Store");
}
@Override
public void visit( LOCogroup cg ) throws FrontendException {
switch (cg.getGroupType()) {
case COLLECTED:
translateCollectedCogroup(cg);
break;
case REGULAR:
POPackage poPackage = compileToLR_GR_PackTrio(cg, cg.getCustomPartitioner(), cg.getInner(), cg.getExpressionPlans());
poPackage.getPkgr().setPackageType(PackageType.GROUP);
logToPhyMap.put(cg, poPackage);
break;
case MERGE:
translateMergeCogroup(cg);
break;
default:
throw new LogicalToPhysicalTranslatorException("Unknown CoGroup Modifier",PigException.BUG);
}
translateSoftLinks(cg);
}
private void translateCollectedCogroup(LOCogroup cg) throws FrontendException {
// can have only one input
LogicalRelationalOperator pred = (LogicalRelationalOperator) plan.getPredecessors(cg).get(0);
List<LogicalExpressionPlan> exprPlans = cg.getExpressionPlans().get(0);
POCollectedGroup physOp = new POCollectedGroup(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
physOp.addOriginalLocation(cg.getAlias(), cg.getLocation());
List<PhysicalPlan> pExprPlans = translateExpressionPlans(cg, exprPlans);
try {
physOp.setPlans(pExprPlans);
} catch (PlanException pe) {
int errCode = 2071;
String msg = "Problem with setting up map group's plans.";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
}
Byte type = null;
if (exprPlans.size() > 1) {
type = DataType.TUPLE;
physOp.setKeyType(type);
} else {
type = pExprPlans.get(0).getLeaves().get(0).getResultType();
physOp.setKeyType(type);
}
physOp.setResultType(DataType.TUPLE);
currentPlan.add(physOp);
try {
currentPlan.connect(logToPhyMap.get(pred), physOp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
logToPhyMap.put(cg, physOp);
}
private POMergeCogroup compileToMergeCogrp(LogicalRelationalOperator relationalOp,
MultiMap<Integer, LogicalExpressionPlan> innerPlans) throws FrontendException {
List<Operator> inputs = relationalOp.getPlan().getPredecessors(relationalOp);
// LocalRearrange corresponding to each of input
// LR is needed to extract keys out of the tuples.
POLocalRearrange[] innerLRs = new POLocalRearrange[inputs.size()];
int count = 0;
List<PhysicalOperator> inpPOs = new ArrayList<PhysicalOperator>(inputs.size());
for (int i=0;i<inputs.size();i++) {
Operator op = inputs.get(i);
PhysicalOperator physOp = logToPhyMap.get(op);
inpPOs.add(physOp);
List<LogicalExpressionPlan> plans = innerPlans.get(i);
POLocalRearrange poInnerLR = new POLocalRearrange(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
poInnerLR.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
// LR will contain list of physical plans, because there could be
// multiple keys and each key can be an expression.
List<PhysicalPlan> exprPlans = translateExpressionPlans(relationalOp, plans);
try {
poInnerLR.setPlans(exprPlans);
} catch (PlanException pe) {
int errCode = 2071;
String msg = "Problem with setting up local rearrange's plans.";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
}
innerLRs[count] = poInnerLR;
try {
poInnerLR.setIndex(count++);
} catch (ExecException e1) {
int errCode = 2058;
String msg = "Unable to set index on newly create POLocalRearrange.";
throw new VisitorException(msg, errCode, PigException.BUG, e1);
}
poInnerLR.setKeyType(plans.size() > 1 ? DataType.TUPLE :
exprPlans.get(0).getLeaves().get(0).getResultType());
poInnerLR.setResultType(DataType.TUPLE);
}
POMergeCogroup poCogrp = new POMergeCogroup(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)),inpPOs,innerLRs,relationalOp.getRequestedParallelism());
return poCogrp;
}
private void translateMergeCogroup(LOCogroup cg) throws FrontendException {
if(!validateMergeCogrp(cg.getInner())){
throw new LogicalToPhysicalTranslatorException("Inner is not " +
"supported for any relation on Merge Cogroup.");
}
List<Operator> inputs = cg.getPlan().getPredecessors(cg);
MapSideMergeValidator validator = new MapSideMergeValidator();
validator.validateMapSideMerge(inputs, cg.getPlan());
POMergeCogroup poCogrp = compileToMergeCogrp(cg, cg.getExpressionPlans());
poCogrp.setResultType(DataType.TUPLE);
poCogrp.addOriginalLocation(cg.getAlias(), cg.getLocation());
currentPlan.add(poCogrp);
for (Operator op : inputs) {
try {
currentPlan.connect(logToPhyMap.get(op), poCogrp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
logToPhyMap.put(cg, poCogrp);
}
private boolean validateMergeCogrp(boolean[] innerFlags){
for(boolean flag : innerFlags){
if(flag)
return false;
}
return true;
}
@Override
public void visit(LOJoin loj) throws FrontendException {
String scope = DEFAULT_SCOPE;
// List of join predicates
List<Operator> inputs = loj.getPlan().getPredecessors(loj);
// mapping of inner join physical plans corresponding to inner physical operators.
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new LinkedMultiMap<PhysicalOperator, PhysicalPlan>();
// Outer list corresponds to join predicates. Inner list is inner physical plan of each predicate.
List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
// List of physical operator corresponding to join predicates.
List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
// Outer list corresponds to join predicates and inner list corresponds to type of keys for each predicate.
List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();
boolean[] innerFlags = loj.getInnerFlags();
String alias = loj.getAlias();
SourceLocation location = loj.getLocation();
int parallel = loj.getRequestedParallelism();
for (int i=0;i<inputs.size();i++) {
Operator op = inputs.get(i);
PhysicalOperator physOp = logToPhyMap.get(op);
inp.add(physOp);
List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>)loj.getJoinPlan(i);
List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
ppLists.add(exprPlans);
joinPlans.put(physOp, exprPlans);
// Key could potentially be a tuple. So, we visit all exprPlans to get types of members of tuples.
List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
for(PhysicalPlan exprPlan : exprPlans)
tupleKeyMemberTypes.add(exprPlan.getLeaves().get(0).getResultType());
keyTypes.add(tupleKeyMemberTypes);
}
if (loj.getJoinType() == LOJoin.JOINTYPE.SKEWED) {
POSkewedJoin skj;
try {
skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
parallel,inp, innerFlags);
skj.addOriginalLocation(alias, location);
skj.setJoinPlans(joinPlans);
}
catch (Exception e) {
int errCode = 2015;
String msg = "Skewed Join creation failed";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
skj.setResultType(DataType.TUPLE);
for (int i=0; i < inputs.size(); i++) {
Operator op = inputs.get(i);
if (!innerFlags[i]) {
try {
LogicalSchema s = ((LogicalRelationalOperator)op).getSchema();
// if the schema cannot be determined
if (s == null) {
throw new FrontendException(loj, "Cannot determine skewed join schema", 2247);
}
skj.addSchema(Util.translateSchema(s));
} catch (FrontendException e) {
int errCode = 2015;
String msg = "Couldn't set the schema for outer join" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
} else {
// This will never be retrieved. It just guarantees that the index will be valid when
// MRCompiler is trying to read the schema
skj.addSchema(null);
}
}
currentPlan.add(skj);
for (Operator op : inputs) {
try {
currentPlan.connect(logToPhyMap.get(op), skj);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
logToPhyMap.put(loj, skj);
}
else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
Schema[] inputSchemas = new Schema[inputs.size()];
Schema[] keySchemas = new Schema[inputs.size()];
outer: for (int i = 0; i < inputs.size(); i++) {
LogicalSchema logicalSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
if (logicalSchema == null) {
continue;
}
Schema toGen = Schema.getPigSchema(new ResourceSchema(logicalSchema));
// This registers the value piece
SchemaTupleFrontend.registerToGenerateIfPossible(toGen, false, GenContext.FR_JOIN);
inputSchemas[i] = toGen;
Schema keyToGen = new Schema();
for (Byte byt : keyTypes.get(i)) {
// We cannot generate any nested code because that information is thrown away
if (byt == null || DataType.isComplex(byt.byteValue())) {
continue outer;
}
keyToGen.add(new FieldSchema(null, byt));
}
SchemaTupleFrontend.registerToGenerateIfPossible(keyToGen, false, GenContext.FR_JOIN);
keySchemas[i] = keyToGen;
}
int fragment = 0;
POFRJoin pfrj;
try {
boolean isLeftOuter = false;
// We dont check for bounds issue as we assume that a join
// involves atleast two inputs
isLeftOuter = !innerFlags[1];
Tuple nullTuple = null;
if( isLeftOuter ) {
try {
// We know that in a Left outer join its only a two way
// join, so we assume index of 1 for the right input
LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
// We check if we have a schema before the join
if(inputSchema == null) {
int errCode = 1109;
String msg = "Input (" + ((LogicalRelationalOperator)inputs.get(1)).getAlias() + ") " +
"on which outer join is desired should have a valid schema";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
}
// Using the schema we decide the number of columns/fields
// in the nullTuple
nullTuple = TupleFactory.getInstance().newTuple(inputSchema.size());
for(int j = 0; j < inputSchema.size(); j++) {
nullTuple.set(j, null);
}
} catch( FrontendException e ) {
int errCode = 2104;
String msg = "Error while determining the schema of input";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
parallel,
inp,
ppLists,
keyTypes,
null,
fragment,
isLeftOuter,
nullTuple,
inputSchemas,
keySchemas);
pfrj.addOriginalLocation(alias, location);
} catch (ExecException e1) {
int errCode = 2058;
String msg = "Unable to set index on newly create POLocalRearrange.";
throw new VisitorException(msg, errCode, PigException.BUG, e1);
}
pfrj.setResultType(DataType.TUPLE);
currentPlan.add(pfrj);
for (Operator op : inputs) {
try {
currentPlan.connect(logToPhyMap.get(op), pfrj);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
logToPhyMap.put(loj, pfrj);
} else if ( (loj.getJoinType() == LOJoin.JOINTYPE.MERGE || loj.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE)
&& (new MapSideMergeValidator().validateMapSideMerge(inputs,loj.getPlan()))) {
PhysicalOperator smj;
boolean usePOMergeJoin = inputs.size() == 2 && innerFlags[0] && innerFlags[1] ;
if(usePOMergeJoin){
// We register the merge join schema information for code generation
LogicalSchema logicalSchema = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
Schema leftSchema = null;
if (logicalSchema != null) {
leftSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
}
logicalSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
Schema rightSchema = null;
if (logicalSchema != null) {
rightSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
}
logicalSchema = loj.getSchema();
Schema mergedSchema = null;
if (logicalSchema != null) {
mergedSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
}
if (leftSchema != null) {
SchemaTupleFrontend.registerToGenerateIfPossible(leftSchema, false, GenContext.MERGE_JOIN);
}
if (rightSchema != null) {
SchemaTupleFrontend.registerToGenerateIfPossible(rightSchema, false, GenContext.MERGE_JOIN);
}
if (mergedSchema != null) {
SchemaTupleFrontend.registerToGenerateIfPossible(mergedSchema, false, GenContext.MERGE_JOIN);
}
// inner join on two sorted inputs. We have less restrictive
// implementation here in a form of POMergeJoin which doesn't
// require loaders to implement collectable interface.
try {
smj = new POMergeJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
parallel,
inp,
joinPlans,
keyTypes,
loj.getJoinType(),
leftSchema,
rightSchema,
mergedSchema);
}
catch (PlanException e) {
int errCode = 2042;
String msg = "Merge Join creation failed";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
logToPhyMap.put(loj, smj);
} else {
// in all other cases we fall back to POMergeCogroup + Flattening FEs
smj = compileToMergeCogrp(loj, loj.getExpressionPlans());
}
smj.setResultType(DataType.TUPLE);
currentPlan.add(smj);
smj.addOriginalLocation(alias, location);
for (Operator op : inputs) {
try {
currentPlan.connect(logToPhyMap.get(op), smj);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
if(!usePOMergeJoin){
// Now create and configure foreach which will flatten the output
// of cogroup.
POForEach fe = compileFE4Flattening(innerFlags, scope, parallel, alias, location, inputs);
currentPlan.add(fe);
try {
currentPlan.connect(smj, fe);
} catch (PlanException e) {
throw new LogicalToPhysicalTranslatorException(e.getMessage(),e.getErrorCode(),e.getErrorSource(),e);
}
logToPhyMap.put(loj, fe);
}
return;
}
else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){
POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans());
POForEach fe = compileFE4Flattening(innerFlags, scope, parallel, alias, location, inputs);
currentPlan.add(fe);
try {
currentPlan.connect(poPackage, fe);
} catch (PlanException e) {
throw new LogicalToPhysicalTranslatorException(e.getDetailedMessage(),
e.getErrorCode(),e.getErrorSource(),e);
}
logToPhyMap.put(loj, fe);
if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) {
if (innerFlags.length == 2) {
if (innerFlags[0] == false && innerFlags[1] == false) {
throw new LogicalToPhysicalTranslatorException(
"Error at " + loj.getLocation() + " with alias "+ loj.getAlias() +
". Bloom join cannot be used with a FULL OUTER join.",
1109,
PigException.INPUT);
}
}
poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN);
} else {
poPackage.getPkgr().setPackageType(PackageType.JOIN);
}
}
translateSoftLinks(loj);
}
private POPackage compileToLR_GR_PackTrio(LogicalRelationalOperator relationalOp, String customPartitioner,
boolean[] innerFlags, MultiMap<Integer, LogicalExpressionPlan> innerPlans) throws FrontendException {
POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelism());
poGlobal.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
poGlobal.setCustomPartitioner(customPartitioner);
POPackage poPackage = new POPackage(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelism());
poPackage.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
currentPlan.add(poGlobal);
currentPlan.add(poPackage);
try {
currentPlan.connect(poGlobal, poPackage);
} catch (PlanException e1) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
}
int count = 0;
Byte type = null;
List<Operator> inputs = relationalOp.getPlan().getPredecessors(relationalOp);
for (int i=0;i<inputs.size();i++) {
Operator op = inputs.get(i);
List<LogicalExpressionPlan> plans = innerPlans.get(i);
POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelism());
physOp.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
List<PhysicalPlan> exprPlans = translateExpressionPlans(relationalOp, plans);
try {
physOp.setPlans(exprPlans);
} catch (PlanException pe) {
int errCode = 2071;
String msg = "Problem with setting up local rearrange's plans.";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
}
try {
physOp.setIndex(count++);
} catch (ExecException e1) {
int errCode = 2058;
String msg = "Unable to set index on newly create POLocalRearrange.";
throw new VisitorException(msg, errCode, PigException.BUG, e1);
}
if (plans.size() > 1) {
type = DataType.TUPLE;
physOp.setKeyType(type);
} else {
type = exprPlans.get(0).getLeaves().get(0).getResultType();
physOp.setKeyType(type);
}
physOp.setResultType(DataType.TUPLE);
currentPlan.add(physOp);
try {
currentPlan.connect(logToPhyMap.get(op), physOp);
currentPlan.connect(physOp, poGlobal);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
poPackage.getPkgr().setKeyType(type);
poPackage.setResultType(DataType.TUPLE);
poPackage.setNumInps(count);
poPackage.getPkgr().setInner(innerFlags);
return poPackage;
}
private POForEach compileFE4Flattening(boolean[] innerFlags,String scope,
int parallel, String alias, SourceLocation location, List<Operator> inputs)
throws FrontendException {
List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
List<Boolean> flattenLst = new ArrayList<Boolean>();
POForEach fe;
try{
for(int i=0;i< inputs.size();i++){
PhysicalPlan fep1 = new PhysicalPlan();
POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
parallel, i+1); //i+1 since the first column is the "group" field
feproj1.addOriginalLocation(alias, location);
feproj1.setResultType(DataType.BAG);
feproj1.setOverloaded(false);
fep1.add(feproj1);
fePlans.add(fep1);
// the parser would have marked the side
// where we need to keep empty bags on
// non matched as outer (innerFlags[i] would be
// false)
if(!(innerFlags[i])) {
Operator joinInput = inputs.get(i);
// for outer join add a bincond
// which will project nulls when bag is
// empty
updateWithEmptyBagCheck(fep1, joinInput);
}
flattenLst.add(true);
}
fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
parallel, fePlans, flattenLst );
fe.addOriginalLocation(alias, location);
}catch (PlanException e1) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
}
return fe;
}
@Override
public void visit(LOUnion loUnion) throws FrontendException {
String scope = DEFAULT_SCOPE;
POUnion physOp = new POUnion(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loUnion.getRequestedParallelism());
physOp.addOriginalLocation(loUnion.getAlias(), loUnion.getLocation());
currentPlan.add(physOp);
physOp.setResultType(DataType.BAG);
logToPhyMap.put(loUnion, physOp);
List<Operator> ops = loUnion.getPlan().getPredecessors(loUnion);
for (Operator l : ops) {
PhysicalOperator from = logToPhyMap.get(l);
try {
currentPlan.connect(from, physOp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
}
@Override
public void visit(LODistinct loDistinct) throws FrontendException {
String scope = DEFAULT_SCOPE;
PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelism());
physOp.setCustomPartitioner(loDistinct.getCustomPartitioner());
physOp.addOriginalLocation(loDistinct.getAlias(), loDistinct.getLocation());
currentPlan.add(physOp);
physOp.setResultType(DataType.BAG);
logToPhyMap.put(loDistinct, physOp);
Operator op = loDistinct.getPlan().getPredecessors(loDistinct).get(0);
PhysicalOperator from = logToPhyMap.get(op);
try {
currentPlan.connect(from, physOp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit(LOLimit loLimit) throws FrontendException {
String scope = DEFAULT_SCOPE;
POLimit poLimit = new POLimit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
loLimit.getRequestedParallelism());
poLimit.setLimit(loLimit.getLimit());
poLimit.addOriginalLocation(loLimit.getAlias(), loLimit.getLocation());
poLimit.setResultType(DataType.BAG);
currentPlan.add(poLimit);
logToPhyMap.put(loLimit, poLimit);
if (loLimit.getLimitPlan() != null) {
// add expression plan to POLimit
currentPlans.push(currentPlan);
currentPlan = new PhysicalPlan();
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(loLimit.getLimitPlan());
pushWalker(childWalker);
currentWalker.walk(new ExpToPhyTranslationVisitor(currentWalker.getPlan(), childWalker, loLimit,
currentPlan, logToPhyMap));
poLimit.setLimitPlan(currentPlan);
popWalker();
currentPlan = currentPlans.pop();
}
Operator op = loLimit.getPlan().getPredecessors(loLimit).get(0);
PhysicalOperator from = logToPhyMap.get(op);
try {
currentPlan.connect(from, poLimit);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
translateSoftLinks(loLimit);
}
@Override
public void visit(LOSplit loSplit) throws FrontendException {
String scope = DEFAULT_SCOPE;
POSplit physOp = new POSplit(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), loSplit.getRequestedParallelism());
physOp.addOriginalLocation(loSplit.getAlias(), loSplit.getLocation());
FileSpec splStrFile;
try {
splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(Utils.getTmpFileCompressorName(pc)));
} catch (IOException e1) {
byte errSrc = pc.getErrorSource();
int errCode = 0;
switch(errSrc) {
case PigException.BUG:
errCode = 2016;
break;
case PigException.REMOTE_ENVIRONMENT:
errCode = 6002;
break;
case PigException.USER_ENVIRONMENT:
errCode = 4003;
break;
}
String msg = "Unable to obtain a temporary path." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, errSrc, e1);
}
physOp.setSplitStore(splStrFile);
logToPhyMap.put(loSplit, physOp);
currentPlan.add(physOp);
List<Operator> op = loSplit.getPlan().getPredecessors(loSplit);
PhysicalOperator from;
if(op != null) {
from = logToPhyMap.get(op.get(0));
} else {
int errCode = 2051;
String msg = "Did not find a predecessor for Split." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
try {
currentPlan.connect(from, physOp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
String scope = DEFAULT_SCOPE;
// System.err.println("Entering Filter");
POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), loSplitOutput.getRequestedParallelism());
poFilter.addOriginalLocation(loSplitOutput.getAlias(), loSplitOutput.getLocation());
poFilter.setResultType(DataType.BAG);
currentPlan.add(poFilter);
logToPhyMap.put(loSplitOutput, poFilter);
currentPlans.push(currentPlan);
currentPlan = new PhysicalPlan();
// PlanWalker childWalker = currentWalker
// .spawnChildWalker(filter.getFilterPlan());
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(loSplitOutput.getFilterPlan());
pushWalker(childWalker);
//currentWalker.walk(this);
currentWalker.walk(
new ExpToPhyTranslationVisitor( currentWalker.getPlan(),
childWalker, loSplitOutput, currentPlan, logToPhyMap) );
popWalker();
poFilter.setPlan(currentPlan);
currentPlan = currentPlans.pop();
List<Operator> op = loSplitOutput.getPlan().getPredecessors(loSplitOutput);
PhysicalOperator from;
if(op != null) {
from = logToPhyMap.get(op.get(0));
} else {
int errCode = 2051;
String msg = "Did not find a predecessor for Filter." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
try {
currentPlan.connect(from, poFilter);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
translateSoftLinks(loSplitOutput);
// System.err.println("Exiting Filter");
}
/**
* updates plan with check for empty bag and if bag is empty to flatten a bag
* with as many null's as dictated by the schema
* @param fePlan the plan to update
* @param joinInput the relation for which the corresponding bag is being checked
* @throws FrontendException
*/
public static void updateWithEmptyBagCheck(PhysicalPlan fePlan, Operator joinInput) throws FrontendException {
LogicalSchema inputSchema = null;
try {
inputSchema = ((LogicalRelationalOperator) joinInput).getSchema();
if(inputSchema == null) {
int errCode = 1109;
String msg = "Input (" + ((LogicalRelationalOperator) joinInput).getAlias() + ") " +
"on which outer join is desired should have a valid schema";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
}
} catch (FrontendException e) {
int errCode = 2104;
String msg = "Error while determining the schema of input";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
CompilerUtils.addEmptyBagOuterJoin(fePlan, Util.translateSchema(inputSchema), false, null);
}
private void translateSoftLinks(Operator op) throws FrontendException {
List<Operator> preds = op.getPlan().getSoftLinkPredecessors(op);
if (preds == null)
return;
for (Operator pred : preds) {
PhysicalOperator from = logToPhyMap.get(pred);
currentPlan.createSoftLink(from, logToPhyMap.get(op));
}
}
}