/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.newplan.logical.visitor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.StreamToPig;
import org.apache.pig.impl.builtin.IdentityColumn;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
import org.apache.pig.newplan.ReverseDependencyOrderWalker;
import org.apache.pig.newplan.logical.expression.BinCondExpression;
import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.DereferenceExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.MapLookupExpression;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.expression.ScalarExpression;
import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;

/**
 * Create mapping between uid and Load FuncSpec when the LogicalExpression 
 * associated with it is known to hold an unmodified element of data returned
 * by the load function.
 * This information is used to find the LoadCaster to be used to cast bytearrays
 * into other other types. 
 */
public class LineageFindRelVisitor extends LogicalRelationalNodesVisitor{


    Map<Long, FuncSpec> uid2LoadFuncMap  = new HashMap<Long, FuncSpec>();
    
    //if LogicalRelationalOperator is associated with a single load func spec
    // then the mapping is stored here
    Map<LogicalRelationalOperator, FuncSpec>  rel2InputFuncMap = 
        new HashMap<LogicalRelationalOperator, FuncSpec>();

    Map<FuncSpec, Class> func2casterMap = new HashMap<FuncSpec, Class>();
    
    public LineageFindRelVisitor(OperatorPlan plan) throws FrontendException {
        super(plan, new DependencyOrderWalker(plan));
        
    }
    
    public Map<Long, FuncSpec> getUid2LoadFuncMap(){
        return uid2LoadFuncMap;
    }
    
    @Override
    public void visit(LOLoad load) throws FrontendException{
        LogicalSchema schema = load.getSchema();
        setLoadFuncForUids(schema, load.getFileSpec().getFuncSpec());
        rel2InputFuncMap.put(load, load.getFileSpec().getFuncSpec());
    }

    @Override
    public void visit(LOStream stream) throws FrontendException{
        mapToPredLoadFunc(stream);
        StreamingCommand command = ((LOStream)stream).getStreamingCommand();
        HandleSpec streamOutputSpec = command.getOutputSpec(); 
        FuncSpec streamLoaderSpec = new FuncSpec(streamOutputSpec.getSpec());
        setLoadFuncForUids(stream.getSchema(), streamLoaderSpec);
        rel2InputFuncMap.put(stream, streamLoaderSpec);
    }
    
    @Override
    public void visit(LOInnerLoad innerLoad) throws FrontendException{
        LOForEach foreach = innerLoad.getLOForEach();
        LogicalRelationalOperator pred = 
            ((LogicalRelationalOperator)foreach.getPlan().getPredecessors(foreach).get(0));

        LogicalSchema predSchema = pred.getSchema();
        
        //if this has a schema, the lineage can be tracked using the uid in input schema
        if(innerLoad.getSchema() != null){
            if(innerLoad.getSchema().size() == 1 
                    && innerLoad.getSchema().getField(0).type == DataType.BYTEARRAY
                    && uid2LoadFuncMap.get(innerLoad.getSchema().getField(0).uid) == null
                    && predSchema != null
            ){
                long inpUid = predSchema.getField(innerLoad.getProjection().getColNum()).uid;
                if(uid2LoadFuncMap.get(inpUid) != null){
                    addUidLoadFuncToMap(innerLoad.getSchema().getField(0).uid, uid2LoadFuncMap.get(inpUid));
                }
                return;
            }

        }

        // associated load func could not be found using uid, use
        // the single load func associated with input relation (if any)
        if(getAssociatedLoadFunc(pred) != null){
            mapRelToPredLoadFunc(innerLoad, pred);
        }
    }
    
    /**
     * map all uids in schema to funcSpec 
     * @param schema
     * @param funcSpec
     * @throws VisitorException
     */
    private void setLoadFuncForUids(LogicalSchema schema, FuncSpec funcSpec)
    throws VisitorException {
        if(schema == null){
            return;
        }
        for(LogicalFieldSchema fs : schema.getFields()){
            addUidLoadFuncToMap((Long) fs.uid, funcSpec);
            setLoadFuncForUids(fs.schema, funcSpec);
        }
        
    }

    @Override
    public void visit(LOFilter filter) throws FrontendException{
        mapToPredLoadFunc(filter);
        visitExpression(filter.getFilterPlan());
    }

    /**
     * If all predecessors of relOp are associated with same load 
     * func , then map reOp to it.
     * 
     * @param relOp
     * @throws FrontendException 
     */
    private void mapToPredLoadFunc(LogicalRelationalOperator relOp)
    throws FrontendException {
        OperatorPlan lp = relOp.getPlan();
        List<Operator> preds = lp.getPredecessors(relOp);
        if(lp.getPredecessors(relOp) == null || 
                lp.getPredecessors(relOp).size() == 0){
            // no predecessors , nothing to do
            return;
        }

        FuncSpec loadFuncSpec = getAssociatedLoadFunc((LogicalRelationalOperator) preds.get(0));

        if(loadFuncSpec == null){
            return;
        }
        //ensure that all predecessors are mapped to same load func spec
        for(int i=1; i<preds.size(); i++){
            if(!haveIdenticalCasters(loadFuncSpec,getAssociatedLoadFunc(
                    (LogicalRelationalOperator) preds.get(i)))){
                return;
            }
        }
        rel2InputFuncMap.put(relOp, loadFuncSpec); 
    }

    /**
     * Find single load func spec associated with this relation.
     * If the relation has schema, all uids in schema should be associated
     * with same load func spec. if it does not have schema check the existing
     * mapping
     * @param relOp
     * @return
     * @throws FrontendException
     */
    private FuncSpec getAssociatedLoadFunc(LogicalRelationalOperator relOp) throws FrontendException {
        LogicalSchema schema = relOp.getSchema();
        FuncSpec funcSpec = null;
        if(schema != null){
            if(schema.size() == 0)
                return null;
            funcSpec = uid2LoadFuncMap.get(schema.getField(0).uid);
            if(funcSpec != null) {
                for(int i=1; i<schema.size(); i++){
                    LogicalFieldSchema fs = schema.getField(i);
                    if(! haveIdenticalCasters(funcSpec,
                            uid2LoadFuncMap.get(fs.uid))){
                        //all uid are not associated with same func spec, there is no
                        // single func spec that represents all the fields
                        funcSpec = null;
                        break;
                    }
                }
            }
        }
        
        if(funcSpec == null){
            // If relOp is LOForEach and contains UDF, byte field could come from UDF.
            // We don't assume it share the LoadCaster with predecessor
            if (relOp instanceof LOForEach) {
                UDFFinder udfFinder = new UDFFinder(((LOForEach) relOp).getInnerPlan());
                udfFinder.visit();
                if (udfFinder.getUDFList().size()!=0)
                    return null;
            }
            
            funcSpec = rel2InputFuncMap.get(relOp);
        }

        return funcSpec;
    }

    private void mapRelToPredLoadFunc(LogicalRelationalOperator relOp,
            Operator pred) {
        if(rel2InputFuncMap.get(pred) != null){
            rel2InputFuncMap.put(relOp, rel2InputFuncMap.get(pred));
        }              
    }

    private void visitExpression(LogicalExpressionPlan expPlan)
    throws FrontendException{
        LineageFindExpVisitor findLineageExp =
            new LineageFindExpVisitor(expPlan, uid2LoadFuncMap);
        findLineageExp.visit();        
    }
    
    @Override
    public void visit(LOCogroup group) throws FrontendException{
        mapToPredLoadFunc(group);
        List<Operator> inputs = group.getInputs((LogicalPlan)plan);
        
        // list of field schemas of group plans
        List<LogicalFieldSchema> groupPlanSchemas = new ArrayList<LogicalFieldSchema>();
        
        MultiMap<Integer, LogicalExpressionPlan> plans = group.getExpressionPlans();
        for(LogicalExpressionPlan expPlan : plans.values()){
            visitExpression(expPlan);
            if(expPlan.getSources().size() != 1){
                throw new AssertionError("Group plans should have only one output");
            }
            groupPlanSchemas.add(((LogicalExpression)expPlan.getSources().get(0)).getFieldSchema());
        }
        
        LogicalSchema sch = group.getSchema();

        //if the group plans are associated with same load function , associate
        //same load fucntion with group column schema
        if (getAssociatedLoadFunc(group)!=null) {
            addUidLoadFuncToMap(sch.getField(0).uid, rel2InputFuncMap.get(group));
            if (sch.getField(0).schema!=null)
                setLoadFuncForUids(sch.getField(0).schema, rel2InputFuncMap.get(group));
        }
        else
            mapMatchLoadFuncToUid(sch.getField(0), groupPlanSchemas);
        
        
        
        //set the load func spec for the bags in the schema, this helps if
        // the input schemas are not set
        //group schema has a group column followed by bags corresponding to each
        // input
        if(sch.size() != inputs.size()+1 ){
            throw new AssertionError("cogroup schema size not same as number of inputs");
        }
        
        
        
        for(int i=1; i < sch.size(); i++){
            long uid = sch.getField(i).uid;
            LogicalRelationalOperator input = (LogicalRelationalOperator) inputs.get(i-1);
            if(getAssociatedLoadFunc(input) != null){
                addUidLoadFuncToMap(uid, rel2InputFuncMap.get(input));
            }
        }
        
    }
    

    @Override
    public void visit(LOJoin join) throws FrontendException{
        mapToPredLoadFunc(join);
         MultiMap<Integer, LogicalExpressionPlan> plans = join.getExpressionPlans();
        for(LogicalExpressionPlan expPlan : plans.values()){
            visitExpression(expPlan);
        }
    }
    
    @Override
    public void visit(LOForEach fe) throws FrontendException{
        mapToPredLoadFunc(fe);
        LogicalPlan innerPlan = fe.getInnerPlan();
        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
        pushWalker(newWalker);
        currentWalker.walk(this);
        popWalker();
    }
    
    @Override
    public void visit(LOGenerate gen) throws FrontendException{
        mapToPredLoadFunc(gen);
        List<LogicalExpressionPlan> expPlans = gen.getOutputPlans();
        for(LogicalExpressionPlan expPlan : expPlans){
           visitExpression(expPlan);
        }
        
        //associate flatten output to the load func associated with input
        //expression of flatten
        boolean[] flattens = gen.getFlattenFlags();
        for(int i=0; i<flattens.length; i++){
            if(flattens[i] == true){
                //get the output schema corresponding to this exp plan
                gen.getSchema();
                if(gen.getOutputPlanSchemas() == null || gen.getOutputPlanSchemas().size() <= i){
                    return;
                }
                LogicalSchema sch = gen.getOutputPlanSchemas().get(i);
                if(sch == null){
                    continue;
                }
                
                //get the only output exp of the ith plan
                LogicalExpression exp =
                    (LogicalExpression) gen.getOutputPlans().get(i).getSources().get(0);

                //get its funcspec and associate it with uid of all fields in the schema
                FuncSpec funcSpec = uid2LoadFuncMap.get(exp.getFieldSchema().uid);
                for(LogicalFieldSchema fs : sch.getFields()){
                    addUidLoadFuncToMap(fs.uid, funcSpec);
                }
            }
        }
        
    }
    
    @Override
    public void visit(LOSort sort) throws FrontendException{
        mapToPredLoadFunc(sort);
        List<LogicalExpressionPlan> expPlans = sort.getSortColPlans();
        for(LogicalExpressionPlan expPlan : expPlans){
            visitExpression(expPlan);
        }
    }

    @Override
    public void visit(LORank rank) throws FrontendException{
        mapToPredLoadFunc(rank);
        List<LogicalExpressionPlan> expPlans = rank.getRankColPlans();
        for(LogicalExpressionPlan expPlan : expPlans){
            visitExpression(expPlan);
        }
    }

    @Override
    public void visit(LODistinct relOp) throws FrontendException{
        mapToPredLoadFunc(relOp);
    }

    @Override
    public void visit(LOLimit loLimit) throws FrontendException{
        mapToPredLoadFunc(loLimit);
        if (loLimit.getLimitPlan() != null)
            visitExpression(loLimit.getLimitPlan());
    }
    
    @Override
    public void visit(LOSplit relOp) throws FrontendException{
        mapToPredLoadFunc(relOp);
    }
    
    @Override
    public void visit(LOStore relOp) throws FrontendException{
        mapToPredLoadFunc(relOp);
    }


    
    @Override
    public void visit(LOCross relOp) throws FrontendException{
        mapToPredLoadFunc(relOp);
    }

    @Override
    public void visit(LOUnion relOp) throws FrontendException{
        mapToPredLoadFunc(relOp);
        
        // Since the uid changes for Union, add mappings for new uids to funcspec
        LogicalSchema schema = relOp.getSchema();
        if(schema != null){
            // For each output field, checking all the fields being
            // union-ed(bundled) together and only set the funcspec when ALL
            // of them come from the same caster
            //
            // A = (i,j)
            // B = (i,j)
            // C = UNION A, B;
            // Checking if A.i and B.i have the same caster.
            // Same for A.j and B.j
            // A.i and A.j may come from the different casters
            for (LogicalFieldSchema logicalFieldSchema : schema.getFields()) {
                Set<Long> inputs = relOp.getInputUids(logicalFieldSchema.uid);
                if( inputs.size() == 0 ) {
                    // uid was not changed.
                    // funcspec should be already set. skipping
                    continue;
                }
                FuncSpec prevLoadFuncSpec = null, curLoadFuncSpec = null;
                boolean allSameLoader = true;
                for(Long inputUid: inputs) {
                    curLoadFuncSpec = uid2LoadFuncMap.get(inputUid) ;
                    if( prevLoadFuncSpec != null
                      && !haveIdenticalCasters(prevLoadFuncSpec,
                                               curLoadFuncSpec) ) {
                        allSameLoader = false;
                        break;
                    }
                  prevLoadFuncSpec  = curLoadFuncSpec;
                }
                if( allSameLoader ) {
                    addUidLoadFuncToMap(logicalFieldSchema.uid,curLoadFuncSpec);
                }
            }
        }
    }
    
    @Override
    public void visit(LOSplitOutput split) throws FrontendException{
        mapToPredLoadFunc(split);
        visitExpression(split.getFilterPlan());
        
        //The uid changes across the input and output of a split.
        //In this visitor the uids for the fields in output are mapped to ones
        // in input based on position. uid2LoadFuncMap is updated with output 
        // schema uid and load func mapped to corresponding uid in inputschema
        
        
        LogicalSchema inputSch = 
            ((LogicalRelationalOperator)split.getPlan().getPredecessors(split).get(0)).getSchema();

        LogicalSchema outSchema = split.getSchema();
        if(inputSch == null && outSchema == null){
            return;
        }

        if(inputSch == null || outSchema == null){
            String msg = "Bug: in split only one of input/output schema is null "
                + split;
            throw new VisitorException(split, msg,2260, PigException.BUG) ; 
        }
        
        if(inputSch.size() != outSchema.size()){
            String msg = "Bug: input and output schema size of split differ "
                + split;
            throw new VisitorException(split, msg,2261, PigException.BUG) ; 
        }

        for(int i=0; i<inputSch.size(); i++){
            LogicalFieldSchema inField = inputSch.getField(i);
            LogicalFieldSchema outField = outSchema.getField(i);
            if(uid2LoadFuncMap.get(inField.uid) != null){
                addUidLoadFuncToMap(outField.uid, uid2LoadFuncMap.get(inField.uid));
            }
        }
        
    }
    
    
    /**
     * Add given uid, loadFuncSpec to mapping
     * @param uid
     * @param loadFuncSpec
     * @throws VisitorException 
     */
    private void addUidLoadFuncToMap(long uid, FuncSpec loadFuncSpec)
    throws VisitorException{
        if(loadFuncSpec == null){
            return;
        }
        //ensure that uid always matches to same load func
        FuncSpec curFuncSpec = uid2LoadFuncMap.get(uid);
        if(curFuncSpec == null){
            uid2LoadFuncMap.put(uid, loadFuncSpec);
        }else if(! haveIdenticalCasters(curFuncSpec,loadFuncSpec)){
            String msg = "Bug: uid mapped to two different load functions : " +
            curFuncSpec + " and " + loadFuncSpec;
            throw new VisitorException(msg,2262, PigException.BUG) ;
        }
    }
    
    /**
     * if uid in input field schemas or their inner schemas map to same 
     * load function, then map the new uid in bincond also to same
     *  load function in uid2LoadFuncMap
     * @param outFS
     * @param inputFieldSchemas
     * @throws VisitorException 
     */
    void mapMatchLoadFuncToUid(
            LogicalFieldSchema outFS,
            List<LogicalFieldSchema> inputFieldSchemas) throws VisitorException {

        
        if(inputFieldSchemas.size() == 0){
            return;
        }

        // If schema of any input is null, we skip output schema
        for (LogicalFieldSchema fs : inputFieldSchemas) {
            if (fs==null)
                return;
        }
        
        //if same non null load func is associated with all fieldschemas
        // asssociate that with the uid of outFS
        LogicalFieldSchema inpFS1 = inputFieldSchemas.get(0);
        FuncSpec funcSpec1 = uid2LoadFuncMap.get(inpFS1.uid);
        boolean allInnerSchemaMatch = false;
        if(funcSpec1 != null){
            boolean allMatch = true;
            allInnerSchemaMatch = true;
            
            for(LogicalFieldSchema fs : inputFieldSchemas){
                //check if all func spec match
                if(!haveIdenticalCasters(funcSpec1,uid2LoadFuncMap.get(fs.uid))){
                    allMatch = false;
                }
                //check if all inner schema match for use later
                if(outFS.schema == null ||  !outFS.schema.isEqual(fs.schema)){
                    allInnerSchemaMatch = false;
                }
            }
            if(allMatch){
                addUidLoadFuncToMap(outFS.uid, funcSpec1);
            }
        }
        
        //recursively call the function for corresponding files in inner schemas
        if(allInnerSchemaMatch){
            List<LogicalFieldSchema> outFields = outFS.schema.getFields();
            for(int i=0; i<outFields.size(); i++){
                List<LogicalFieldSchema> inFsList = new ArrayList<LogicalFieldSchema>();
                for(LogicalFieldSchema fs : inputFieldSchemas){
                    inFsList.add(fs.schema.getField(i));
                }                        
                mapMatchLoadFuncToUid(outFields.get(i), inFsList);
            }
        }

    }
    


    /**
     * If a input of dereference or map-lookup has associated load function, 
     * the same load function should be associated with the dereference or map-lookup. 
     * 
     */
    class LineageFindExpVisitor extends LogicalExpressionVisitor{

        private Map<Long, FuncSpec> uid2LoadFuncMap;

        public LineageFindExpVisitor(LogicalExpressionPlan plan,
                Map<Long, FuncSpec> uid2LoadFuncMap) throws FrontendException {
            super(plan, new ReverseDependencyOrderWalker(plan));
            this.uid2LoadFuncMap = uid2LoadFuncMap;
        }
        
        @Override
        public void visit(ProjectExpression proj) throws FrontendException {
            // proj should have the same uid as input, if input has schema.
            // if uid does not have associated func spec and input relation 
            // has null schema or it is inner-load, use the load func associated
            // with the relation 
            LogicalRelationalOperator inputRel = proj.findReferent();

            if (proj.getFieldSchema()==null)
                return;
            
            long uid = proj.getFieldSchema().uid;
            if(uid2LoadFuncMap.get(uid) == null && (inputRel.getSchema() == null || inputRel instanceof LOInnerLoad)){
                FuncSpec funcSpec = rel2InputFuncMap.get(inputRel);
                if(funcSpec != null){
                    addUidLoadFuncToMap(uid, funcSpec);
                }
            }
        }
        
        @Override
        public void visit(DereferenceExpression deref) throws FrontendException {
           updateUidMap(deref, deref.getReferredExpression());
        }

        @Override
        public void visit(MapLookupExpression mapLookup) throws FrontendException {
            updateUidMap(mapLookup, mapLookup.getMap());
        }
        
        
        private void updateUidMap(LogicalExpression exp,
                LogicalExpression inp) throws FrontendException {
            //find input uid and corresponding load FuncSpec
            long inpUid = inp.getFieldSchema().uid;
            FuncSpec inpLoadFuncSpec = uid2LoadFuncMap.get(inpUid);
            addUidLoadFuncToMap(exp.getFieldSchema().uid, inpLoadFuncSpec);

        }

        
        @Override
        public void visit(BinCondExpression binCond) throws FrontendException{
            //if either side is a null constant, we can safely associate
            //this bincond with the load func spec on other side
            LogicalExpression lhs = binCond.getLhs();
            LogicalExpression rhs = binCond.getRhs();
            
            if(getNULLConstantInCast(lhs) != null){
                FuncSpec funcSpec = uid2LoadFuncMap.get(rhs.getFieldSchema().uid);
                uid2LoadFuncMap.put(binCond.getFieldSchema().uid, funcSpec);
            }
            else if(getNULLConstantInCast(rhs) != null){
                FuncSpec funcSpec = uid2LoadFuncMap.get(lhs.getFieldSchema().uid);
                uid2LoadFuncMap.put(binCond.getFieldSchema().uid, funcSpec);
            }
            else {
                List<LogicalFieldSchema> inFieldSchemas = new ArrayList<LogicalFieldSchema>();
                inFieldSchemas.add(lhs.getFieldSchema());
                inFieldSchemas.add(rhs.getFieldSchema());
                mapMatchLoadFuncToUid(
                        binCond.getFieldSchema(), 
                        inFieldSchemas
                );
            }
        }
        
        @Override
        public void visit(ScalarExpression scalarExp) throws FrontendException{
            //track lineage to input load function
            
            
            LogicalRelationalOperator inputRel = (LogicalRelationalOperator)scalarExp.getAttachedLogicalOperator();
            LogicalPlan relPlan = (LogicalPlan) inputRel.getPlan();
            List<Operator> softPreds = relPlan.getSoftLinkPredecessors(inputRel);
            
            //1st argument is the column number in input, 2nd arg is the input file name
            Integer inputColNum = (Integer)((ConstantExpression) scalarExp.getArguments().get(0)).getValue();
            String inputFile = (String)((ConstantExpression) scalarExp.getArguments().get(1)).getValue();
            
            long outputUid = scalarExp.getFieldSchema().uid;
            boolean foundInput = false; // a variable to do sanity check on num of input relations

            //find the input relation, and use it to get lineage
            for(Operator softPred : softPreds){
                LOStore inputStore = (LOStore) softPred;
                if(inputStore.getFileSpec().getFileName().equals(inputFile)){
                    
                    if(foundInput == true){
                        throw new FrontendException(
                                "More than one input found for scalar expression",
                                2268,
                                PigException.BUG
                        );
                    }
                    foundInput = true;
                    
                    //found the store corresponding to this scalar expression
                    LogicalSchema sch = inputStore.getSchema();
                    if(sch == null){
                        //see if there is a load function associated with the store
                        FuncSpec funcSpec = rel2InputFuncMap.get(inputStore);
                        addUidLoadFuncToMap(outputUid, funcSpec);
                    }else{
                        //find input uid and corresponding load func
                        LogicalFieldSchema fs = sch.getField(inputColNum);
                        FuncSpec funcSpec = uid2LoadFuncMap.get(fs.uid);
                        addUidLoadFuncToMap(outputUid, funcSpec);
                    }
                }
            }
            
            if(foundInput == false){
                throw new FrontendException(
                        "No input found for scalar expression",
                        2269,
                        PigException.BUG
                );
            }
        }

        @Override
        public void visit(UserFuncExpression op) throws FrontendException {

            if( op.getFieldSchema() == null ) {
                return;
            }

            FuncSpec funcSpec = null;
            Class loader = instantiateCaster(op.getFuncSpec());
            List<LogicalExpression> arguments = op.getArguments();
            if ( loader != null ) {
                // if evalFunc.getLoadCaster() returns, simply use that.
                funcSpec = op.getFuncSpec();
            } else if (arguments.size() != 0 ) {
                FuncSpec baseFuncSpec = null;
                LogicalFieldSchema fs = arguments.get(0).getFieldSchema();
                if ( fs != null ) {
                    baseFuncSpec = uid2LoadFuncMap.get(fs.uid);
                    if( baseFuncSpec != null ) {
                        funcSpec = baseFuncSpec;
                        for(int i = 1; i < arguments.size(); i++) {
                            fs = arguments.get(i).getFieldSchema();
                            if( fs == null || !haveIdenticalCasters(baseFuncSpec, uid2LoadFuncMap.get(fs.uid)) ) {
                                funcSpec = null;
                                break;
                            }
                        }
                    }
                }
            }

            if( funcSpec != null ) {
                addUidLoadFuncToMap(op.getFieldSchema().uid, funcSpec);
                // in case schema is nested, set funcSpec for all
                setLoadFuncForUids(op.getFieldSchema().schema, funcSpec);
            }
        }

        /**
         * if there is a null constant under casts, return it
         * @param rel
         * @return
         * @throws FrontendException
         */
        private Object getNULLConstantInCast(LogicalExpression rel) throws FrontendException {
            if(rel instanceof CastExpression){
                if( ((CastExpression)rel).getExpression() instanceof CastExpression)
                    return getNULLConstantInCast(((CastExpression)rel).getExpression());
                else if( ((CastExpression)rel).getExpression() instanceof ConstantExpression){
                    ConstantExpression constExp = (ConstantExpression)((CastExpression)rel).getExpression();
                    if(constExp.getValue() == null)
                        return constExp;
                    else 
                        return null;
                }
            }
            return null;
        }

        
    }

    //Copied from POCast.instantiateFunc
    private Class instantiateCaster(FuncSpec funcSpec) throws VisitorException {
        if ( funcSpec == null ) {
            return null;
        }

        if( func2casterMap.containsKey(funcSpec) ) {
          return func2casterMap.get(funcSpec);
        }

        LoadCaster caster = null;
        Object obj = PigContext.instantiateFuncFromSpec(funcSpec);
        try {
            if (obj instanceof LoadFunc) {
                caster = ((LoadFunc)obj).getLoadCaster();
            } else if (obj instanceof StreamToPig) {
                caster = ((StreamToPig)obj).getLoadCaster();
            } else if (obj instanceof EvalFunc) {
                caster = ((EvalFunc)obj).getLoadCaster();
            } else {
                throw new VisitorException("Invalid class type " + funcSpec.getClassName(),
                                           2270, PigException.BUG );
            }
        } catch (IOException e) {
            throw new VisitorException("Invalid class type " + funcSpec.getClassName(),
                                         2270, e );
        }

        Class retval = (caster == null) ? null : caster.getClass();
        func2casterMap.put(funcSpec, retval);
        return retval;
    }


    private boolean haveIdenticalCasters(FuncSpec f1, FuncSpec f2)  throws VisitorException{
        if( f1 == null || f2 == null ) {
            return false;
        }
        if( f1.equals(f2) ) {
            return true;
        }
        Class caster1 = instantiateCaster(f1);
        Class caster2 = instantiateCaster(f2);

        if( caster1 == null || caster2 == null ) {
            return false;
        }

        //From PIG-3295
        //If the class name for LoadCaster are the same, and LoadCaster only has
        //default constructor, then two LoadCasters are considered equal
        if( caster1.getCanonicalName().equals(caster2.getCanonicalName())  &&
            caster1.getConstructors().length == 1 &&
            caster1.getConstructors()[0].getGenericParameterTypes().length == 0  &&
            caster2.getConstructors().length == 1 &&
            caster2.getConstructors()[0].getGenericParameterTypes().length == 0 ) {
            return true;
        }

        return false;

    }
    
    
}
