/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.newplan.logical.visitor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.DepthFirstWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCube;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;

import com.google.common.primitives.Booleans;

/**
 * A visitor to walk operators that contain a nested plan and translate project( * )
 * operators to a list of projection operators, i.e., 
 * project( * ) -> project(0), project(1), ... project(n-2), project(n-1)
 * If input schema is null, project(*) is not expanded.
 * It also expands project range ( eg $1 .. $5). It won't expand project-range-to-end
 * (eg $3 ..) if the input schema is null.
 * 
 */
public class ProjectStarExpander extends LogicalRelationalNodesVisitor{

    public ProjectStarExpander(OperatorPlan plan)
    throws FrontendException {
        super(plan, new DependencyOrderWalker(plan));
    }

    @Override
    public void visit(LOSort sort) throws FrontendException{
        List<LogicalExpressionPlan> expPlans = sort.getSortColPlans();
        List<Boolean> ascOrder = sort.getAscendingCols();

        // new expressionplans and sort order list after star expansion
        List<LogicalExpressionPlan> newExpPlans =
            new ArrayList<LogicalExpressionPlan>();
        List<Boolean> newAscOrder =  new ArrayList<Boolean>();

        if(expPlans.size() != ascOrder.size()){
            throw new AssertionError("Size of expPlans and ascorder should be same");
        }            
        
        for(int i=0; i < expPlans.size(); i++){
            //expand the plan
            LogicalExpressionPlan ithExpPlan = expPlans.get(i);
            List<LogicalExpressionPlan> expandedPlans = expandPlan(ithExpPlan, 0);
            newExpPlans.addAll(expandedPlans);

            //add corresponding isAsc flags
            Boolean isAsc = ascOrder.get(i);
            for(int j=0; j < expandedPlans.size(); j++){
                newAscOrder.add(isAsc);
            }
        }

        //check if there is a project-star-to-end followed by another sort plan
        // in the expanded plans (can happen if there is no input schema)
        for(int i=0; i < newExpPlans.size(); i++){
            ProjectExpression proj = getProjectStar(newExpPlans.get(i));
            if(proj != null && 
                    proj.isRangeProject() && proj.getEndCol() == -1 &&
                    i != newExpPlans.size() -1
            ){
                //because of order by sampler logic limitation, this is not
                //supported right now
                String msg = "Project-range to end (eg. x..)" +
                " is supported in order-by only as last sort column";
                throw new FrontendException(
                        msg,
                        1128,
                        PigException.INPUT
                );
            }
        }

        sort.setSortColPlans(newExpPlans);
        sort.setAscendingCols(newAscOrder);
    }

    @Override
    public void visit(LORank rank) throws FrontendException {

        List<LogicalExpressionPlan> expPlans = rank.getRankColPlans();
        List<Boolean> ascOrder = rank.getAscendingCol();

        List<LogicalExpressionPlan> newExpPlans = new ArrayList<LogicalExpressionPlan>();
        List<Boolean> newAscOrder = new ArrayList<Boolean>();

        if (expPlans.size() != ascOrder.size()) {
            throw new AssertionError(
                    "Size of expPlans and ascorder should be same");
        }

        for (int i = 0; i < expPlans.size(); i++) {
            // expand the plan
            LogicalExpressionPlan ithExpPlan = expPlans.get(i);
            List<LogicalExpressionPlan> expandedPlans = expandPlan(ithExpPlan,
                    0);
            newExpPlans.addAll(expandedPlans);

            // add corresponding isAsc flags
            Boolean isAsc = ascOrder.get(i);
            for (int j = 0; j < expandedPlans.size(); j++) {
                newAscOrder.add(isAsc);
            }
        }

        // check if there is a project-star-to-end followed by another sort plan
        // in the expanded plans (can happen if there is no input schema)
        for (int i = 0; i < newExpPlans.size(); i++) {
            ProjectExpression proj = getProjectStar(newExpPlans.get(i));
            if (proj != null && proj.isRangeProject() && proj.getEndCol() == -1
                    && i != newExpPlans.size() - 1) {
                String msg = "Project-range to end (eg. x..)"
                        + " is supported in rank-by only as last rank column";
                throw new FrontendException(msg, 1128, PigException.INPUT);
            }
        }

        rank.setRankColPlan(newExpPlans);
        rank.setAscendingCol(newAscOrder);
    }

    /**
     * Expand plan into multiple plans if the plan contains a project star,
     * if there is no project star the returned list contains the plan argument.
     * @param plan
     * @return
     * @throws FrontendException
     */
    private List<LogicalExpressionPlan> expandPlan(LogicalExpressionPlan plan, int inputNum)
    throws FrontendException {
        List<LogicalExpressionPlan> expandedPlans;
        ProjectExpression projStar = getProjectStar(plan);
        if(projStar != null){
            // expand the plan into multiple plans
            return expandPlan(plan, projStar, inputNum);
        }else{
            //no project star to expand
            expandedPlans = new ArrayList<LogicalExpressionPlan>();
            expandedPlans.add(plan);
        }
        return expandedPlans;
    }

    @Override
    public void visit(LOCogroup cg) throws FrontendException{
        
        MultiMap<Integer, LogicalExpressionPlan> inpExprPlans = 
            cg.getExpressionPlans();
 
        //modify the plans if they have project-star
        expandPlans(inpExprPlans);
        
        
        //do some validations -
        List<Operator> inputs = cg.getInputs((LogicalPlan) cg.getPlan());

        // check if after translation none of group by plans in a cogroup
        // have a project(*) - if they still do it's because the input
        // for the project(*) did not have a schema - in this case, we should
        // error out since we could have different number/types of 
        // cogroup keys
        if(inputs.size() > 1) { // only for cogroups
            for(int i=0; i<inputs.size(); i++)
                for(LogicalExpressionPlan lp: inpExprPlans.get(i)) {
                    if(getProjectStar(lp) != null) {
                        String msg = "Cogroup/Group by '*' or 'x..' " +
                        "(range of columns to the end) " +
                        "is only allowed if the input has a schema";
                        throw new VisitorException( cg,
                                msg,
                                1123,
                                PigException.INPUT
                        );
                    }
                }
        }
        // check if after translation all group by plans have same arity
        int arity = inpExprPlans.get(0).size();
        for(int i=1; i<inputs.size(); i++){
            if(arity != inpExprPlans.get(i).size()) {
                String msg = "The arity of cogroup/group by columns " +
                "do not match";
                throw new VisitorException(cg,
                        msg,
                        1122,
                        PigException.INPUT
                );
            }
        }

    }

    @Override
    public void visit(LOCube cu) throws FrontendException {

	MultiMap<Integer, LogicalExpressionPlan> inpExprPlans = cu.getExpressionPlans();

	// modify the plans if they have project-star
	expandPlans(inpExprPlans);

    }

    @Override
    public void visit(LOJoin join) throws FrontendException{
        expandPlans(join.getExpressionPlans());
    }

    @Override
    public void visit(LOForEach foreach) throws FrontendException{
        //in case of LOForeach , expand when inner plan has a single project-star
        // and its input LOInnerLoad also is a project-star
        // then Reset the input number in project expressions
        
        LogicalPlan innerPlan = foreach.getInnerPlan();
        
        //visit the inner plan first
        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
        pushWalker(newWalker);
        currentWalker.walk(this);
        popWalker();
        
        //get the LOGenerate
        List<Operator> feOutputs = innerPlan.getSinks();
        LOGenerate gen = null;
        for( Operator op  : feOutputs){
            if(op instanceof LOGenerate){
                if(gen != null){
                    String msg = "Expected single LOGenerate output in innerplan of foreach";
                    throw new VisitorException(foreach,
                            msg,
                            2266,
                            PigException.BUG
                    );
                }
                gen = (LOGenerate) op;
            }
        }
        
        //work on the generate plan, flatten and user schema
        List<LogicalExpressionPlan> expPlans = gen.getOutputPlans();
        List<LogicalExpressionPlan> newExpPlans = new ArrayList<LogicalExpressionPlan>();
        
        List<Operator> loGenPreds = innerPlan.getPredecessors(gen);
        
        if(loGenPreds == null){
            // there are no LOInnerLoads , must be working on just constants
            // no project-star expansion to be done
            return;
        }
        
        List<LogicalSchema> userSchema = gen.getUserDefinedSchema();
        List<LogicalSchema> newUserSchema = null;
        if(userSchema != null){
            newUserSchema = new ArrayList<LogicalSchema>();
        }
        
        boolean[] flattens = gen.getFlattenFlags();
        List<Boolean> newFlattens = new ArrayList<Boolean>(flattens.length);

        //get mapping of LOGenerate predecessor current position to object
        Map<Integer, LogicalRelationalOperator> oldPos2Rel =
            new HashMap<Integer, LogicalRelationalOperator>();
        
        for(int i=0; i<loGenPreds.size(); i++){
            oldPos2Rel.put(i, (LogicalRelationalOperator) loGenPreds.get(i));
        }
        
        //get schema of predecessor, project-star expansion needs a schema
        LogicalRelationalOperator pred =
            (LogicalRelationalOperator) foreach.getPlan().getPredecessors(foreach).get(0);
        LogicalSchema inpSch = pred.getSchema();
 
        //store mapping between the projection in inner plans of
        // of LOGenerate to the input relation object
        Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel =
            new HashMap<ProjectExpression, LogicalRelationalOperator>();
        
        
        for(int i=0; i<expPlans.size(); i++){
            LogicalExpressionPlan expPlan = expPlans.get(i);
            ProjectExpression projStar = getProjectLonelyStar(expPlan, oldPos2Rel);

            boolean foundExpandableProject = false;
            if(projStar != null){              
                //there is a project-star to be expanded

                LogicalSchema userStarSch = null;
                if(userSchema != null && userSchema.get(i) != null){
                    userStarSch = userSchema.get(i);
                }


                //the range values are set in the project in LOInnerLoad
                ProjectExpression loInnerProj = ((LOInnerLoad)oldPos2Rel.get(projStar.getInputNum())).getProjection();

                int firstProjCol = 0;
                int lastProjCol = 0;
                
                if(loInnerProj.isRangeProject()){
                    loInnerProj.setColumnNumberFromAlias();
                    firstProjCol = loInnerProj.getStartCol();
                    lastProjCol = loInnerProj.getEndCol();
                }

                
                boolean isProjectToEnd = loInnerProj.isProjectStar() || 
                    (loInnerProj.isRangeProject() && lastProjCol == -1); 
                
                //can't expand if there is no input schema, and this is
                // as project star or project-range-to-end
                if( !(inpSch == null && isProjectToEnd) ){
                    
                    foundExpandableProject = true;

                    if(isProjectToEnd)
                        lastProjCol = inpSch.size() - 1;

                    //replacing the existing project star with new ones
                    expPlan.remove(projStar);

                    //remove the LOInnerLoad with star
                    LOInnerLoad oldLOInnerLoad = (LOInnerLoad)oldPos2Rel.get(projStar.getInputNum());
                    innerPlan.disconnect(oldLOInnerLoad, gen);
                    innerPlan.remove(oldLOInnerLoad);


                    //generate new exp plan, inner load for each field in schema
                    for(int j = firstProjCol; j <= lastProjCol; j++){

                        //add new LOInnerLoad
                        LOInnerLoad newInLoad = new LOInnerLoad(innerPlan, foreach, j);
                        innerPlan.add(newInLoad);
                        innerPlan.connect(newInLoad, gen);


                        // new expression plan and proj
                        LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
                        newExpPlans.add(newExpPlan);

                        ProjectExpression newProj =
                            new ProjectExpression(newExpPlan, -2, -1, gen);

                        proj2InpRel.put(newProj, newInLoad);

                        newFlattens.add(flattens[i]);
                        if(newUserSchema != null ){
                            //index into user specified schema
                            int schIdx = j - firstProjCol;
                            if(userStarSch != null 
                                    && userStarSch.getFields().size() > schIdx
                                    && userStarSch.getField(schIdx) != null){

                                //if the project-star field has user specified schema, use the
                                // j'th field for this column
                                LogicalSchema sch = new LogicalSchema();
                                sch.addField(new LogicalFieldSchema(userStarSch.getField(schIdx)));
                                newUserSchema.add(sch);
                            }
                            else{
                                newUserSchema.add(null);
                            }
                        }
                    }
                }
            }

            if(!foundExpandableProject){ //no project-star that could be expanded

                //get all projects in here 
                FindProjects findProjs = new FindProjects(expPlan);
                findProjs.visit();
                List<ProjectExpression> projs = findProjs.getProjs();

                //create a mapping of project expression to their inputs
                for(ProjectExpression proj : projs){
                    proj2InpRel.put(proj, oldPos2Rel.get(proj.getInputNum()));
                }

                newExpPlans.add(expPlan);

                newFlattens.add(flattens[i]);
                if(newUserSchema != null)
                    newUserSchema.add(userSchema.get(i));

            }
        }

        //get mapping of LoGenerate input relation to current position
        Map<LogicalRelationalOperator, Integer> rel2pos = new HashMap<LogicalRelationalOperator, Integer>();
        List<Operator> newGenPreds = innerPlan.getPredecessors(gen);
        int numNewGenPreds = 0;
        if(newGenPreds != null)
            numNewGenPreds = newGenPreds.size();
            
        for(int i=0; i<numNewGenPreds; i++){
            rel2pos.put((LogicalRelationalOperator) newGenPreds.get(i),i);
        }
        
        //correct the input num for projects
        for(Entry<ProjectExpression, LogicalRelationalOperator> projAndInp : proj2InpRel.entrySet()){
           ProjectExpression proj = projAndInp.getKey();
           LogicalRelationalOperator rel = projAndInp.getValue();
           proj.setInputNum(rel2pos.get(rel));
        }
        
        // set the new lists
        gen.setOutputPlans(newExpPlans);
        gen.setFlattenFlags(Booleans.toArray(newFlattens));
        gen.setUserDefinedSchema(newUserSchema);
        
        gen.resetSchema();
        foreach.resetSchema();
        
    }
    
    
    static class FindProjects extends LogicalExpressionVisitor{

        private List<ProjectExpression> projs = new ArrayList<ProjectExpression>();

        protected FindProjects(LogicalExpressionPlan plan)
                throws FrontendException {
            super(plan, new DepthFirstWalker(plan));
        }
        
        @Override
        public void visit(ProjectExpression proj){
           projs.add(proj);
        }

        public List<ProjectExpression> getProjs(){
            return projs;
        }
    }

    /**
     * Find project-star in foreach statement. The LOInnerLoad corresponding
     * to the project-star also needs to have a project-star
     * @param expPlan - expression plan
     * @param oldPos2Rel - inner relational plan of foreach
     * @return ProjectExpression that satisfies the conditions
     * @throws FrontendException 
     */
    private ProjectExpression getProjectLonelyStar(LogicalExpressionPlan expPlan,
            Map<Integer, LogicalRelationalOperator> oldPos2Rel) throws FrontendException {

        //the expression plan should have just a single project
        if(expPlan.size() == 0 || expPlan.size() > 1){
            return null;
        }

        Operator outputOp = expPlan.getOperators().next();
        if(outputOp instanceof ProjectExpression){
            ProjectExpression proj = (ProjectExpression)outputOp;
            //check if ProjectExpression is projectStar
            if(proj.isProjectStar()){
                //now check if its input is a LOInnerLoad and it is projectStar 
                // or range project
                LogicalRelationalOperator inputRel = oldPos2Rel.get(proj.getInputNum());
                if(! (inputRel  instanceof LOInnerLoad)){
                    return null;
                }

                ProjectExpression innerProj = ((LOInnerLoad) inputRel).getProjection(); 
                if( innerProj.isRangeOrStarProject()){
                    return proj;
                }
            }
        }
        return null;
    }


    private void expandPlans(
            MultiMap<Integer, LogicalExpressionPlan> inpExprPlans)
    throws FrontendException {
 
        //for each input relation, expand any logical plan that has project-star
        for(int i=0; i< inpExprPlans.size() ; i++){
            List<LogicalExpressionPlan> plans = inpExprPlans.get(i);
            List<LogicalExpressionPlan> newPlans =
                new ArrayList<LogicalExpressionPlan>();
            for(LogicalExpressionPlan plan : plans){
                newPlans.addAll(expandPlan(plan, i));
            }
            inpExprPlans.removeKey(i);
            inpExprPlans.put(i, newPlans);
        }
   
    }

    /**
     * expand this plan containing project star to multiple plans 
     * each projecting a single column
     * @param expPlan
     * @param proj
     * @return
     * @throws FrontendException 
     */
    private List<LogicalExpressionPlan> expandPlan(
            LogicalExpressionPlan expPlan, ProjectExpression proj, int inputNum)
            throws FrontendException {
        
        Pair<Integer, Integer> startAndEndProjs =
            ProjectStarExpanderUtil.getProjectStartEndCols(expPlan, proj);  
        List<LogicalExpressionPlan> newPlans = new ArrayList<LogicalExpressionPlan>();

        if(startAndEndProjs == null){
            // can't expand this project
            newPlans.add(expPlan);
            return newPlans;
        }

        //expand from firstProjCol to lastProjCol 
        int firstProjCol = startAndEndProjs.first;
        int lastProjCol = startAndEndProjs.second;

        LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
        for(int i = firstProjCol; i <= lastProjCol; i++){
            newPlans.add(createExpPlanWithProj(relOp, inputNum, i));
        }

        return newPlans;

    }

    /**
     * Create new logical plan with a project that is attached to LogicalRelation
     * attachRel and projects i'th column from input
     * @param attachRel 
     * @param inputNum
     * @param colNum 
     * @return
     */
    private LogicalExpressionPlan createExpPlanWithProj(
            LogicalRelationalOperator attachRel, 
            int inputNum, int colNum) {
        LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
        ProjectExpression newProj = 
            new ProjectExpression(newExpPlan, inputNum, colNum, attachRel);
        newExpPlan.add(newProj);
        return newExpPlan;
    }

    /**
     * if LogicalExpressionPlan argument has a project star output then
     * return it, otherwise return null
     * @param expPlan
     * @return
     * @throws FrontendException 
     */
    private ProjectExpression getProjectStar(LogicalExpressionPlan expPlan)
    throws FrontendException {
        List<Operator> outputs = expPlan.getSources();
        ProjectExpression projStar = null;
        for(Operator outputOp : outputs){
            if(outputOp instanceof ProjectExpression){
                ProjectExpression proj = (ProjectExpression)outputOp;
                if(proj.isRangeOrStarProject()){
                    if(outputs.size() > 1){
                        String msg = "More than one operator in an expression plan" +
                        " containing project star(*)/project-range (..)";
                        throw new VisitorException(proj,
                                msg,
                                2264,
                                PigException.BUG
                        );
                    }
                    projStar = proj;
                }
            }
        }
        return projStar;
    }



}
