blob: 135de1c9c8a65bcefc7b893e4d154933a250a45e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.visitor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.DepthFirstWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCube;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import com.google.common.primitives.Booleans;
/**
* A visitor to walk operators that contain a nested plan and translate project( * )
* operators to a list of projection operators, i.e.,
* project( * ) -> project(0), project(1), ... project(n-2), project(n-1)
* If input schema is null, project(*) is not expanded.
* It also expands project range ( eg $1 .. $5). It won't expand project-range-to-end
* (eg $3 ..) if the input schema is null.
*
*/
public class ProjectStarExpander extends LogicalRelationalNodesVisitor{
public ProjectStarExpander(OperatorPlan plan)
throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
}
@Override
public void visit(LOSort sort) throws FrontendException{
List<LogicalExpressionPlan> expPlans = sort.getSortColPlans();
List<Boolean> ascOrder = sort.getAscendingCols();
// new expressionplans and sort order list after star expansion
List<LogicalExpressionPlan> newExpPlans =
new ArrayList<LogicalExpressionPlan>();
List<Boolean> newAscOrder = new ArrayList<Boolean>();
if(expPlans.size() != ascOrder.size()){
throw new AssertionError("Size of expPlans and ascorder should be same");
}
for(int i=0; i < expPlans.size(); i++){
//expand the plan
LogicalExpressionPlan ithExpPlan = expPlans.get(i);
List<LogicalExpressionPlan> expandedPlans = expandPlan(ithExpPlan, 0);
newExpPlans.addAll(expandedPlans);
//add corresponding isAsc flags
Boolean isAsc = ascOrder.get(i);
for(int j=0; j < expandedPlans.size(); j++){
newAscOrder.add(isAsc);
}
}
//check if there is a project-star-to-end followed by another sort plan
// in the expanded plans (can happen if there is no input schema)
for(int i=0; i < newExpPlans.size(); i++){
ProjectExpression proj = getProjectStar(newExpPlans.get(i));
if(proj != null &&
proj.isRangeProject() && proj.getEndCol() == -1 &&
i != newExpPlans.size() -1
){
//because of order by sampler logic limitation, this is not
//supported right now
String msg = "Project-range to end (eg. x..)" +
" is supported in order-by only as last sort column";
throw new FrontendException(
msg,
1128,
PigException.INPUT
);
}
}
sort.setSortColPlans(newExpPlans);
sort.setAscendingCols(newAscOrder);
}
@Override
public void visit(LORank rank) throws FrontendException {
List<LogicalExpressionPlan> expPlans = rank.getRankColPlans();
List<Boolean> ascOrder = rank.getAscendingCol();
List<LogicalExpressionPlan> newExpPlans = new ArrayList<LogicalExpressionPlan>();
List<Boolean> newAscOrder = new ArrayList<Boolean>();
if (expPlans.size() != ascOrder.size()) {
throw new AssertionError(
"Size of expPlans and ascorder should be same");
}
for (int i = 0; i < expPlans.size(); i++) {
// expand the plan
LogicalExpressionPlan ithExpPlan = expPlans.get(i);
List<LogicalExpressionPlan> expandedPlans = expandPlan(ithExpPlan,
0);
newExpPlans.addAll(expandedPlans);
// add corresponding isAsc flags
Boolean isAsc = ascOrder.get(i);
for (int j = 0; j < expandedPlans.size(); j++) {
newAscOrder.add(isAsc);
}
}
// check if there is a project-star-to-end followed by another sort plan
// in the expanded plans (can happen if there is no input schema)
for (int i = 0; i < newExpPlans.size(); i++) {
ProjectExpression proj = getProjectStar(newExpPlans.get(i));
if (proj != null && proj.isRangeProject() && proj.getEndCol() == -1
&& i != newExpPlans.size() - 1) {
String msg = "Project-range to end (eg. x..)"
+ " is supported in rank-by only as last rank column";
throw new FrontendException(msg, 1128, PigException.INPUT);
}
}
rank.setRankColPlan(newExpPlans);
rank.setAscendingCol(newAscOrder);
}
/**
* Expand plan into multiple plans if the plan contains a project star,
* if there is no project star the returned list contains the plan argument.
* @param plan
* @return
* @throws FrontendException
*/
private List<LogicalExpressionPlan> expandPlan(LogicalExpressionPlan plan, int inputNum)
throws FrontendException {
List<LogicalExpressionPlan> expandedPlans;
ProjectExpression projStar = getProjectStar(plan);
if(projStar != null){
// expand the plan into multiple plans
return expandPlan(plan, projStar, inputNum);
}else{
//no project star to expand
expandedPlans = new ArrayList<LogicalExpressionPlan>();
expandedPlans.add(plan);
}
return expandedPlans;
}
@Override
public void visit(LOCogroup cg) throws FrontendException{
MultiMap<Integer, LogicalExpressionPlan> inpExprPlans =
cg.getExpressionPlans();
//modify the plans if they have project-star
expandPlans(inpExprPlans);
//do some validations -
List<Operator> inputs = cg.getInputs((LogicalPlan) cg.getPlan());
// check if after translation none of group by plans in a cogroup
// have a project(*) - if they still do it's because the input
// for the project(*) did not have a schema - in this case, we should
// error out since we could have different number/types of
// cogroup keys
if(inputs.size() > 1) { // only for cogroups
for(int i=0; i<inputs.size(); i++)
for(LogicalExpressionPlan lp: inpExprPlans.get(i)) {
if(getProjectStar(lp) != null) {
String msg = "Cogroup/Group by '*' or 'x..' " +
"(range of columns to the end) " +
"is only allowed if the input has a schema";
throw new VisitorException( cg,
msg,
1123,
PigException.INPUT
);
}
}
}
// check if after translation all group by plans have same arity
int arity = inpExprPlans.get(0).size();
for(int i=1; i<inputs.size(); i++){
if(arity != inpExprPlans.get(i).size()) {
String msg = "The arity of cogroup/group by columns " +
"do not match";
throw new VisitorException(cg,
msg,
1122,
PigException.INPUT
);
}
}
}
@Override
public void visit(LOCube cu) throws FrontendException {
MultiMap<Integer, LogicalExpressionPlan> inpExprPlans = cu.getExpressionPlans();
// modify the plans if they have project-star
expandPlans(inpExprPlans);
}
@Override
public void visit(LOJoin join) throws FrontendException{
expandPlans(join.getExpressionPlans());
}
@Override
public void visit(LOForEach foreach) throws FrontendException{
//in case of LOForeach , expand when inner plan has a single project-star
// and its input LOInnerLoad also is a project-star
// then Reset the input number in project expressions
LogicalPlan innerPlan = foreach.getInnerPlan();
//visit the inner plan first
PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
pushWalker(newWalker);
currentWalker.walk(this);
popWalker();
//get the LOGenerate
List<Operator> feOutputs = innerPlan.getSinks();
LOGenerate gen = null;
for( Operator op : feOutputs){
if(op instanceof LOGenerate){
if(gen != null){
String msg = "Expected single LOGenerate output in innerplan of foreach";
throw new VisitorException(foreach,
msg,
2266,
PigException.BUG
);
}
gen = (LOGenerate) op;
}
}
//work on the generate plan, flatten and user schema
List<LogicalExpressionPlan> expPlans = gen.getOutputPlans();
List<LogicalExpressionPlan> newExpPlans = new ArrayList<LogicalExpressionPlan>();
List<Operator> loGenPreds = innerPlan.getPredecessors(gen);
if(loGenPreds == null){
// there are no LOInnerLoads , must be working on just constants
// no project-star expansion to be done
return;
}
List<LogicalSchema> userSchema = gen.getUserDefinedSchema();
List<LogicalSchema> newUserSchema = null;
if(userSchema != null){
newUserSchema = new ArrayList<LogicalSchema>();
}
boolean[] flattens = gen.getFlattenFlags();
List<Boolean> newFlattens = new ArrayList<Boolean>(flattens.length);
//get mapping of LOGenerate predecessor current position to object
Map<Integer, LogicalRelationalOperator> oldPos2Rel =
new HashMap<Integer, LogicalRelationalOperator>();
for(int i=0; i<loGenPreds.size(); i++){
oldPos2Rel.put(i, (LogicalRelationalOperator) loGenPreds.get(i));
}
//get schema of predecessor, project-star expansion needs a schema
LogicalRelationalOperator pred =
(LogicalRelationalOperator) foreach.getPlan().getPredecessors(foreach).get(0);
LogicalSchema inpSch = pred.getSchema();
//store mapping between the projection in inner plans of
// of LOGenerate to the input relation object
Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel =
new HashMap<ProjectExpression, LogicalRelationalOperator>();
for(int i=0; i<expPlans.size(); i++){
LogicalExpressionPlan expPlan = expPlans.get(i);
ProjectExpression projStar = getProjectLonelyStar(expPlan, oldPos2Rel);
boolean foundExpandableProject = false;
if(projStar != null){
//there is a project-star to be expanded
LogicalSchema userStarSch = null;
if(userSchema != null && userSchema.get(i) != null){
userStarSch = userSchema.get(i);
}
//the range values are set in the project in LOInnerLoad
ProjectExpression loInnerProj = ((LOInnerLoad)oldPos2Rel.get(projStar.getInputNum())).getProjection();
int firstProjCol = 0;
int lastProjCol = 0;
if(loInnerProj.isRangeProject()){
loInnerProj.setColumnNumberFromAlias();
firstProjCol = loInnerProj.getStartCol();
lastProjCol = loInnerProj.getEndCol();
}
boolean isProjectToEnd = loInnerProj.isProjectStar() ||
(loInnerProj.isRangeProject() && lastProjCol == -1);
//can't expand if there is no input schema, and this is
// as project star or project-range-to-end
if( !(inpSch == null && isProjectToEnd) ){
foundExpandableProject = true;
if(isProjectToEnd)
lastProjCol = inpSch.size() - 1;
//replacing the existing project star with new ones
expPlan.remove(projStar);
//remove the LOInnerLoad with star
LOInnerLoad oldLOInnerLoad = (LOInnerLoad)oldPos2Rel.get(projStar.getInputNum());
innerPlan.disconnect(oldLOInnerLoad, gen);
innerPlan.remove(oldLOInnerLoad);
//generate new exp plan, inner load for each field in schema
for(int j = firstProjCol; j <= lastProjCol; j++){
//add new LOInnerLoad
LOInnerLoad newInLoad = new LOInnerLoad(innerPlan, foreach, j);
innerPlan.add(newInLoad);
innerPlan.connect(newInLoad, gen);
// new expression plan and proj
LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
newExpPlans.add(newExpPlan);
ProjectExpression newProj =
new ProjectExpression(newExpPlan, -2, -1, gen);
proj2InpRel.put(newProj, newInLoad);
newFlattens.add(flattens[i]);
if(newUserSchema != null ){
//index into user specified schema
int schIdx = j - firstProjCol;
if(userStarSch != null
&& userStarSch.getFields().size() > schIdx
&& userStarSch.getField(schIdx) != null){
//if the project-star field has user specified schema, use the
// j'th field for this column
LogicalSchema sch = new LogicalSchema();
sch.addField(new LogicalFieldSchema(userStarSch.getField(schIdx)));
newUserSchema.add(sch);
}
else{
newUserSchema.add(null);
}
}
}
}
}
if(!foundExpandableProject){ //no project-star that could be expanded
//get all projects in here
FindProjects findProjs = new FindProjects(expPlan);
findProjs.visit();
List<ProjectExpression> projs = findProjs.getProjs();
//create a mapping of project expression to their inputs
for(ProjectExpression proj : projs){
proj2InpRel.put(proj, oldPos2Rel.get(proj.getInputNum()));
}
newExpPlans.add(expPlan);
newFlattens.add(flattens[i]);
if(newUserSchema != null)
newUserSchema.add(userSchema.get(i));
}
}
//get mapping of LoGenerate input relation to current position
Map<LogicalRelationalOperator, Integer> rel2pos = new HashMap<LogicalRelationalOperator, Integer>();
List<Operator> newGenPreds = innerPlan.getPredecessors(gen);
int numNewGenPreds = 0;
if(newGenPreds != null)
numNewGenPreds = newGenPreds.size();
for(int i=0; i<numNewGenPreds; i++){
rel2pos.put((LogicalRelationalOperator) newGenPreds.get(i),i);
}
//correct the input num for projects
for(Entry<ProjectExpression, LogicalRelationalOperator> projAndInp : proj2InpRel.entrySet()){
ProjectExpression proj = projAndInp.getKey();
LogicalRelationalOperator rel = projAndInp.getValue();
proj.setInputNum(rel2pos.get(rel));
}
// set the new lists
gen.setOutputPlans(newExpPlans);
gen.setFlattenFlags(Booleans.toArray(newFlattens));
gen.setUserDefinedSchema(newUserSchema);
gen.resetSchema();
foreach.resetSchema();
}
static class FindProjects extends LogicalExpressionVisitor{
private List<ProjectExpression> projs = new ArrayList<ProjectExpression>();
protected FindProjects(LogicalExpressionPlan plan)
throws FrontendException {
super(plan, new DepthFirstWalker(plan));
}
@Override
public void visit(ProjectExpression proj){
projs.add(proj);
}
public List<ProjectExpression> getProjs(){
return projs;
}
}
/**
* Find project-star in foreach statement. The LOInnerLoad corresponding
* to the project-star also needs to have a project-star
* @param expPlan - expression plan
* @param oldPos2Rel - inner relational plan of foreach
* @return ProjectExpression that satisfies the conditions
* @throws FrontendException
*/
private ProjectExpression getProjectLonelyStar(LogicalExpressionPlan expPlan,
Map<Integer, LogicalRelationalOperator> oldPos2Rel) throws FrontendException {
//the expression plan should have just a single project
if(expPlan.size() == 0 || expPlan.size() > 1){
return null;
}
Operator outputOp = expPlan.getOperators().next();
if(outputOp instanceof ProjectExpression){
ProjectExpression proj = (ProjectExpression)outputOp;
//check if ProjectExpression is projectStar
if(proj.isProjectStar()){
//now check if its input is a LOInnerLoad and it is projectStar
// or range project
LogicalRelationalOperator inputRel = oldPos2Rel.get(proj.getInputNum());
if(! (inputRel instanceof LOInnerLoad)){
return null;
}
ProjectExpression innerProj = ((LOInnerLoad) inputRel).getProjection();
if( innerProj.isRangeOrStarProject()){
return proj;
}
}
}
return null;
}
private void expandPlans(
MultiMap<Integer, LogicalExpressionPlan> inpExprPlans)
throws FrontendException {
//for each input relation, expand any logical plan that has project-star
for(int i=0; i< inpExprPlans.size() ; i++){
List<LogicalExpressionPlan> plans = inpExprPlans.get(i);
List<LogicalExpressionPlan> newPlans =
new ArrayList<LogicalExpressionPlan>();
for(LogicalExpressionPlan plan : plans){
newPlans.addAll(expandPlan(plan, i));
}
inpExprPlans.removeKey(i);
inpExprPlans.put(i, newPlans);
}
}
/**
* expand this plan containing project star to multiple plans
* each projecting a single column
* @param expPlan
* @param proj
* @return
* @throws FrontendException
*/
private List<LogicalExpressionPlan> expandPlan(
LogicalExpressionPlan expPlan, ProjectExpression proj, int inputNum)
throws FrontendException {
Pair<Integer, Integer> startAndEndProjs =
ProjectStarExpanderUtil.getProjectStartEndCols(expPlan, proj);
List<LogicalExpressionPlan> newPlans = new ArrayList<LogicalExpressionPlan>();
if(startAndEndProjs == null){
// can't expand this project
newPlans.add(expPlan);
return newPlans;
}
//expand from firstProjCol to lastProjCol
int firstProjCol = startAndEndProjs.first;
int lastProjCol = startAndEndProjs.second;
LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
for(int i = firstProjCol; i <= lastProjCol; i++){
newPlans.add(createExpPlanWithProj(relOp, inputNum, i));
}
return newPlans;
}
/**
* Create new logical plan with a project that is attached to LogicalRelation
* attachRel and projects i'th column from input
* @param attachRel
* @param inputNum
* @param colNum
* @return
*/
private LogicalExpressionPlan createExpPlanWithProj(
LogicalRelationalOperator attachRel,
int inputNum, int colNum) {
LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
ProjectExpression newProj =
new ProjectExpression(newExpPlan, inputNum, colNum, attachRel);
newExpPlan.add(newProj);
return newExpPlan;
}
/**
* if LogicalExpressionPlan argument has a project star output then
* return it, otherwise return null
* @param expPlan
* @return
* @throws FrontendException
*/
private ProjectExpression getProjectStar(LogicalExpressionPlan expPlan)
throws FrontendException {
List<Operator> outputs = expPlan.getSources();
ProjectExpression projStar = null;
for(Operator outputOp : outputs){
if(outputOp instanceof ProjectExpression){
ProjectExpression proj = (ProjectExpression)outputOp;
if(proj.isRangeOrStarProject()){
if(outputs.size() > 1){
String msg = "More than one operator in an expression plan" +
" containing project star(*)/project-range (..)";
throw new VisitorException(proj,
msg,
2264,
PigException.BUG
);
}
projStar = proj;
}
}
}
return projStar;
}
}