blob: 90849d508bec2c964e693e237c60e9b96e06b501 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.visitor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
import org.apache.pig.newplan.ReverseDependencyOrderWalker;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
/**
* Expand project-star or project-range when used as udf argument.
* This is different from {@link ProjectStarExpander} because in those
* cases, the project star gets expanded as new {@link LogicalExpressionPlan}.
* In case of project-star or project-range within udf, it should get expanded
* only as multiple inputs to this udf, no addtional {@link LogicalExpressionPlan}s
* are created.
* The expansion happens only if input schema is not null
*/
public class ProjStarInUdfExpander extends AllExpressionVisitor {
public ProjStarInUdfExpander(OperatorPlan plan) throws FrontendException {
super( plan, new DependencyOrderWalker( plan ) );
}
@Override
protected LogicalExpressionVisitor getVisitor(final LogicalExpressionPlan exprPlan)
throws FrontendException {
//This handles the expansion udf in all operators other than foreach
return new ProjExpanderForNonForeach(exprPlan);
}
/*
* LOForeach needs special handling because LOInnerLoad's inner ProjectExpression
* is the one that gets expanded
*/
@Override
public void visit(LOForEach foreach) throws FrontendException{
LogicalPlan innerPlan = foreach.getInnerPlan();
//visit the inner plan first
PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
pushWalker(newWalker);
currentWalker.walk(this);
popWalker();
//get the LOGenerate
List<Operator> feOutputs = innerPlan.getSinks();
LOGenerate gen = null;
for( Operator op : feOutputs){
if(op instanceof LOGenerate){
if(gen != null){
String msg = "Expected single LOGenerate output in innerplan of foreach";
throw new VisitorException(foreach,
msg,
2266,
PigException.BUG
);
}
gen = (LOGenerate) op;
}
}
List<Operator> loGenPreds = innerPlan.getPredecessors(gen);
if(loGenPreds == null){
// there are no LOInnerLoads , must be working on just constants
// no project-star expansion to be done
return;
}
//get mapping of LOGenerate predecessor current position to object
Map<Integer, LogicalRelationalOperator> oldPos2Rel =
new HashMap<Integer, LogicalRelationalOperator>();
for(int i=0; i<loGenPreds.size(); i++){
oldPos2Rel.put(i, (LogicalRelationalOperator) loGenPreds.get(i));
}
//store mapping between the projection in inner plans of
// of LOGenerate to the input relation object
Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel =
new HashMap<ProjectExpression, LogicalRelationalOperator>();
List<LOInnerLoad> expandedInLoads = new ArrayList<LOInnerLoad>();
//visit each expression plan, and expand the projects in the udf
for( OperatorPlan plan : gen.getOutputPlans()){
ProjExpanderForForeach projExpander = new ProjExpanderForForeach(
plan,
gen,
oldPos2Rel,
proj2InpRel,
foreach,
expandedInLoads
);
projExpander.visit();
}
//remove the LOInnerLoads that have been expanded
for(LOInnerLoad inLoad : expandedInLoads){
innerPlan.disconnect(inLoad, gen);
innerPlan.remove(inLoad);
}
//reset the input relation position in the projects
//get mapping of LoGenerate input relation to current position
Map<LogicalRelationalOperator, Integer> rel2pos = new HashMap<LogicalRelationalOperator, Integer>();
List<Operator> newGenPreds = innerPlan.getPredecessors(gen);
int numNewGenPreds = 0;
if(newGenPreds != null)
numNewGenPreds = newGenPreds.size();
for(int i=0; i<numNewGenPreds; i++){
rel2pos.put((LogicalRelationalOperator) newGenPreds.get(i),i);
}
//correct the input num for projects
for(Entry<ProjectExpression, LogicalRelationalOperator> projAndInp : proj2InpRel.entrySet()){
ProjectExpression proj = projAndInp.getKey();
LogicalRelationalOperator rel = projAndInp.getValue();
proj.setInputNum(rel2pos.get(rel));
}
}
@Override
public void visit(LOGenerate gen) throws FrontendException{
}
}
class ProjExpanderForForeach extends LogicalExpressionVisitor{
private LOGenerate loGen;
private LogicalPlan innerRelPlan;
private Map<Integer, LogicalRelationalOperator> oldPos2Rel;
private Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel;
private LOForEach foreach;
private List<LOInnerLoad> expandedInLoads;
protected ProjExpanderForForeach(
OperatorPlan p,
LOGenerate loGen,
Map<Integer, LogicalRelationalOperator> oldPos2Rel,
Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel,
LOForEach foreach,
List<LOInnerLoad> expandedInLoads
)
throws FrontendException {
super(p, new ReverseDependencyOrderWalker(p));
this.loGen = loGen;
this.innerRelPlan = (LogicalPlan) loGen.getPlan();
this.oldPos2Rel = oldPos2Rel;
this.proj2InpRel = proj2InpRel;
this.foreach = foreach;
this.expandedInLoads = expandedInLoads;
}
@Override
public void visit(UserFuncExpression func) throws FrontendException{
if(plan.getSuccessors(func) == null){
// no args for the udf, so nothing to do
return;
}
List<Operator> inputs = new ArrayList<Operator>(plan.getSuccessors(func));
// expandedProjectStars will be removed from plan
List<Operator> expandedProjectStars = new ArrayList<Operator>();
// new projects to be added to the plan
List<Operator> newExpandedProjects = new ArrayList<Operator>();
//new set of inputs
List<Operator> newInputs = new ArrayList<Operator>();
for(Operator inp : inputs){
if(inp instanceof ProjectExpression && ((ProjectExpression)inp).isRangeOrStarProject()
&& oldPos2Rel.get(((ProjectExpression)inp).getInputNum()) instanceof LOInnerLoad
){
//under foreach the ProjectExpression is always a project-star,
// need to check if the input relation is a LOInnerLoad
// containing project-star/range
LOInnerLoad inLoad =
(LOInnerLoad)oldPos2Rel.get(((ProjectExpression)inp).getInputNum());
ProjectExpression innerProj = inLoad.getProjection();
if(!innerProj.isRangeOrStarProject()){
newInputs.add(inp);
continue;
}
//try expanding the project-star/range
List<Operator> expandedOps = expandProjectStar(innerProj);
if(expandedOps != null){
expandedProjectStars.add(inp);//to remove
expandedInLoads.add(inLoad);//to remove
newInputs.addAll(expandedOps);
newExpandedProjects.addAll(expandedOps);//to add
}else {
newInputs.add(inp);
}
}else{
newInputs.add(inp);
}
}
//make changes to the plan if there is a project that was expanded
if(expandedProjectStars.size() > 0){
//disconnect old inputs
for(Operator inp : inputs){
plan.disconnect(func, inp);
}
//remove expanded projects
for(Operator op : expandedProjectStars){
plan.remove(op);
proj2InpRel.remove(op);
}
//add new projects
for(Operator op : newExpandedProjects){
plan.add(op);
}
//connect new inputs
for(Operator newInp : newInputs){
plan.connect(func, newInp);
}
}
}
@Override
public void visit(ProjectExpression proj){
//add project to LOInnerLoad mapping so that the input number can be
//corrected later
proj2InpRel.put(proj, oldPos2Rel.get(proj.getInputNum()));
}
private List<Operator> expandProjectStar(ProjectExpression proj)
throws FrontendException {
Pair<Integer, Integer> firstLastCols =
ProjectStarExpanderUtil.getProjectStartEndCols((LogicalExpressionPlan)plan, proj);
if(firstLastCols == null){
//no expansion happening now
return null;
}
//expand from firstProjCol to lastProjCol
int firstProjCol = firstLastCols.first;
int lastProjCol = firstLastCols.second;
List<Operator> newProjects = new ArrayList<Operator>();
for(int i = firstProjCol; i <= lastProjCol; i++){
LOInnerLoad newILoad = new LOInnerLoad(innerRelPlan, foreach, i);
innerRelPlan.add(newILoad);
innerRelPlan.connect(newILoad, loGen);
ProjectExpression newProj = new ProjectExpression(plan, -2, -1, loGen) ;
proj2InpRel.put(newProj, newILoad);
newProjects.add(newProj);
}
return newProjects;
}
}
class ProjExpanderForNonForeach extends LogicalExpressionVisitor{
protected ProjExpanderForNonForeach(OperatorPlan p)
throws FrontendException {
super(p, new ReverseDependencyOrderWalker(p));
}
@Override
public void visit(UserFuncExpression func) throws FrontendException {
if(plan.getSuccessors(func) == null){
//udf without args, nothing to do
return;
}
List<Operator> inputs = new ArrayList<Operator>(plan.getSuccessors(func));
// expandedProjectStars will be removed from plan
List<Operator> expandedProjectStars = new ArrayList<Operator>();
// new projects to be added to the plan
List<Operator> newExpandedProjects = new ArrayList<Operator>();
//new set of inputs
List<Operator> newInputs = new ArrayList<Operator>();
for(Operator inp : inputs){
if(inp instanceof ProjectExpression && ((ProjectExpression)inp).isRangeOrStarProject() ){
//try expanding the project-star/range
List<Operator> expandedOps = expandProjectStar((ProjectExpression)inp);
if(expandedOps != null){
expandedProjectStars.add(inp);//to remove
newInputs.addAll(expandedOps);
newExpandedProjects.addAll(expandedOps);//to add
}else {
newInputs.add(inp);
}
}else{
newInputs.add(inp);
}
}
//make changes to the plan if there is a project that was expanded
if(expandedProjectStars.size() > 0){
//disconnect old inputs
for(Operator inp : inputs){
plan.disconnect(func, inp);
}
//remove expanded projects
for(Operator op : expandedProjectStars){
plan.remove(op);
}
//add new projects
for(Operator op : newExpandedProjects){
plan.add(op);
}
//connect new inputs
for(Operator newInp : newInputs){
plan.connect(func, newInp);
}
}
}
private List<Operator> expandProjectStar(
ProjectExpression proj) throws FrontendException {
Pair<Integer, Integer> firstLastCols =
ProjectStarExpanderUtil.getProjectStartEndCols((LogicalExpressionPlan)plan, proj);
if(firstLastCols == null){
//no expansion happening now
return null;
}
//expand from firstProjCol to lastProjCol
int firstProjCol = firstLastCols.first;
int lastProjCol = firstLastCols.second;
List<Operator> newProjects = new ArrayList<Operator>();
LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
for(int i = firstProjCol; i <= lastProjCol; i++){
newProjects.add(new ProjectExpression(plan, proj.getInputNum(), i, relOp));
}
return newProjects;
}
};