/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.mrql;

import org.apache.mrql.gen.*;
import java.util.*;
import java.io.*;


/** contains useful methods used by all stages of compilation and code generation */
public class Translator extends Printer {
    static Trees functions = #[ ];

    static {
        ClassImporter.load_classes();
        DataSource.loadParsers();
    }

    /** type environment that binds local variables to types */
    static SymbolTable type_env = new SymbolTable();

    /** type environment that binds global variables to types */
    static SymbolTable global_type_env = new SymbolTable();

    /** type environment that binds datatype names to types */
    static SymbolTable global_datatype_env = new SymbolTable();

    /** used in pattern compilation and variable renaming */
    static SymbolTable st = new SymbolTable();

    /** binds macro names to MRQL expressions (used for 'name = expr;' syntax) */
    static SymbolTable global_vars = new SymbolTable();

    /** binds a UDF name to its plan */
    static SymbolTable global_functions = new SymbolTable();

    /** binds a macro name to its body */
    static SymbolTable global_macros = new SymbolTable();

    /** used in typedefs */
    static SymbolTable type_names = new SymbolTable();

    /** binds a data constructor name to its type */
    static SymbolTable data_constructors = new SymbolTable();

    static Trees repeat_variables = #[];

    static {
        global_type_env.insert("args",#<list(string)>);
    }

    /** expressions with impure functions cannot factored out */
    static Trees impure_functions = #[random];

    private static int var_count = 0;

    static void reset () {
        var_count = 0;
        type_env = global_type_env;
        st = new SymbolTable();
        type_env = new SymbolTable();
        repeat_variables = #[];
    }

    static void global_reset () {
        reset();
        global_type_env = new SymbolTable();
        global_datatype_env = new SymbolTable();
        global_vars = new SymbolTable();
        global_functions = new SymbolTable();
        global_macros = new SymbolTable();
        type_names = new SymbolTable();
        data_constructors = new SymbolTable();
        new TopLevel();
    }

    static void error ( String msg ) {
        System.err.println("*** MRQL error at line "+Main.parser.line_pos()+": "+msg);
        throw new Error();
    }

    final static Tree identity = #<lambda(x,x)>;

    static Tree identity () {
        return Normalization.rename(#<lambda(x,bag(x))>);
    }

    /** is this type a collection type? */
    public static boolean is_collection ( String x ) {
        return x.equals("Bag") || x.equals("bag") || x.equals("List") || x.equals("list");
    }

    /** is this type a collection type? */
    public static boolean collection_type ( Tree tp ) {
        match tp {
        case `T(`t1): return is_collection(T);
        };
        return false;
    }

    /** is this type a collection type for values stored in HDFS? */
    public static boolean is_persistent_collection ( String x ) {
        return x.equals("Bag") || x.equals("List");
    }

    /** make this collection type a persistent type that is stored in HDFS */
    public static String persistent_collection ( String x ) {
        return (x.equals("list")) ? "List" : (x.equals("bag")) ? "Bag" : x;
    }

    /** make this collection type a transient type stored in memory */
    public static String transient_collection ( String x ) {
        return (x.equals("List")) ? "list" : (x.equals("Bag")) ? "bag" : x;
    }

    /** An aggeregation must be based on a commutative monoid (plus,zero) with a unit:
     *    name(type,plus,zero,unit)
     * plus: function from (b,b) to b, zero: b, unit: function from a to b
     */
    static Trees monoids =
        #[ count(any,lambda(x,call(plus,nth(x,0),nth(x,1))),typed(0,long),lambda(x,typed(1,long))),
           sum(int,lambda(x,call(plus,nth(x,0),nth(x,1))),typed(0,long),lambda(x,typed(x,long))),
           sum(long,lambda(x,call(plus,nth(x,0),nth(x,1))),typed(0,long),`identity),
           sum(float,lambda(x,call(plus,nth(x,0),nth(x,1))),typed(0.0,double),lambda(x,typed(x,double))),
           sum(double,lambda(x,call(plus,nth(x,0),nth(x,1))),typed(0.0,double),`identity),
           max(int,lambda(x,call(max,nth(x,0),nth(x,1))),typed(`(Integer.MIN_VALUE),int),`identity),
           max(long,lambda(x,call(max,nth(x,0),nth(x,1))),typed(`(Long.MIN_VALUE),long),`identity),
           max(float,lambda(x,call(max,nth(x,0),nth(x,1))),typed(`(Float.MIN_VALUE),float),`identity),
           max(double,lambda(x,call(max,nth(x,0),nth(x,1))),typed(`(Double.MIN_VALUE),double),`identity),
           min(int,lambda(x,call(min,nth(x,0),nth(x,1))),typed(`(Integer.MAX_VALUE),int),`identity),
           min(long,lambda(x,call(min,nth(x,0),nth(x,1))),typed(`(Long.MAX_VALUE),long),`identity),
           min(float,lambda(x,call(min,nth(x,0),nth(x,1))),typed(`(Float.MAX_VALUE),float),`identity),
           min(double,lambda(x,call(min,nth(x,0),nth(x,1))),typed(`(Double.MAX_VALUE),double),`identity),
           avg_aggr(int,lambda(x,tuple(call(plus,nth(nth(x,0),0),nth(nth(x,1),0)),
                                       call(plus,nth(nth(x,0),1),nth(nth(x,1),1)))),
                    tuple(typed(0.0,double),typed(0,long)),
                    lambda(x,tuple(typed(x,double),typed(1,long)))),
           avg_aggr(long,lambda(x,tuple(call(plus,nth(nth(x,0),0),nth(nth(x,1),0)),
                                        call(plus,nth(nth(x,0),1),nth(nth(x,1),1)))),
                    tuple(typed(0.0,double),typed(0,long)),
                    lambda(x,tuple(typed(x,double),typed(1,long)))),
           avg_aggr(float,lambda(x,tuple(call(plus,nth(nth(x,0),0),nth(nth(x,1),0)),
                                         call(plus,nth(nth(x,0),1),nth(nth(x,1),1)))),
                    tuple(typed(0.0,double),typed(0,long)),
                    lambda(x,tuple(typed(x,double),typed(1,long)))),
           avg_aggr(double,lambda(x,tuple(call(plus,nth(nth(x,0),0),nth(nth(x,1),0)),
                                          call(plus,nth(nth(x,0),1),nth(nth(x,1),1)))),
                    tuple(typed(0.0,double),typed(0,long)),
                    lambda(x,tuple(typed(x,double),typed(1,long)))),
           all(bool,lambda(x,call(and,nth(x,0),nth(x,1))),true,`identity),
           some(bool,lambda(x,call(or,nth(x,0),nth(x,1))),false,`identity)
           ];

    static void print_aggregates () {
        for ( Tree m: monoids )
            match m {
            case `f(`tp,...):
                System.out.print(" "+f+":"+print_type(tp));
            }
        System.out.println();
    }

    static Trees plans_with_distributed_lambdas
        = #[MapReduce,MapAggregateReduce,MapCombineReduce,FroupByJoin,Aggregate,
            MapReduce2,MapCombineReduce2,MapAggregateReduce2,MapJoin,MapAggregateJoin,
            CrossProduct,CrossAggregateProduct,cMap,AggregateMap,BSP,GroupByJoin];

    static Trees algebraic_operators
        = #[mapReduce,mapReduce2,cmap,join,groupBy,orderBy,aggregate,map,filter,repeat,closure];

    static Trees plan_names = plans_with_distributed_lambdas.append(algebraic_operators)
                                   .append(#[Repeat,Closure,Generator,Let,If]);

    /** generates new variable names */
    public static Tree new_var () {
        return new VariableLeaf("x_"+(Integer.toString(var_count++)));
    }

    /** is this expression pure? (does it contain calls to impure functions?) */
    static boolean is_pure ( Tree expr ) {
        match expr {
        case call(`f,...al):
            if (impure_functions.member(f))
                return false;
            else fail
        case `f(...al):
            for ( Tree a: al )
                if (!is_pure(a))
                    return false;
        };
        return true;
    }

    public static Trees union ( Trees xs, Trees ys ) {
        Trees res = xs;
        for ( Tree y: ys )
            if (!xs.member(y))
                res = res.append(y);
        return res;
    }

    /** return the variables of a pattern */
    static Trees pattern_variables ( Tree pattern ) {
        Trees args = #[];
        match pattern {
        case tuple(...pl):
            for ( Tree p: pl )
                args = union(args,pattern_variables(p));
        case record(...bl):
            for ( Tree b: bl )
                match b {
                case bind(`n,`p):
                    args = union(args,pattern_variables(p));
                };
        case typed(`p,_):
            args = pattern_variables(p);
        case `v:
            if (v.is_variable())
                args = #[`v];
        };
        return args;
    }

    /** replace all occurences of from_expr in expr with to_expr
     * @param from_expr target
     * @param to_expr replacement
     * @param expr input
     * @return equal to expr but with all occurences of from_expr replaced with to_expr
     */
    public static Tree subst ( Tree from_expr, Tree to_expr, Tree expr ) {
        if (expr.equals(from_expr))
            return to_expr;
        match expr {
        case lambda(`v,_):
            if (pattern_variables(v).member(from_expr))
                return expr;
            else fail
        case bind(`a,`u):
            return #<bind(`a,`(subst(from_expr,to_expr,u)))>;
        case `f(...al):
            return #<`f(...(subst_list(from_expr,to_expr,al)))>;
        };
        return expr;
    }

    /** replace all occurences of from_expr in el with to_expr
     * @param from_expr target
     * @param to_expr replacement
     * @param el list of input expressions
     * @return equal to el but with all occurences of from_expr replaced with to_expr
     */
    public static Trees subst_list ( Tree from_expr, Tree to_expr, Trees el ) {
        Trees bl = #[];
        for ( Tree e: el )
            bl = bl.append(subst(from_expr,to_expr,e));
        return bl;
    }

    /** replace all occurences of var in expr with to_expr only if to_expr is pure or it is used once only
     * @param var target
     * @param to_expr replacement
     * @param expr input
     * @return equal to expr but with all occurences of from_expr replaced with to_expr
     */
    public static Tree subst_var ( Tree var, Tree to_expr, Tree expr ) {
        if (!is_pure(to_expr) && occurences(var,expr) > 1)
            return #<let(`var,`to_expr,`expr)>;
        else return subst(var,to_expr,expr);
    }

    /** used in the MRQL parser to handle templates */
    public static Tree template ( Tree s ) {
        match s {
        case template(`parser,...as):
            try {
                Trees args = #[];
                String tx = "";
                int i = 0;
                for ( Tree a: as )
                    match a {
                    case text(`t): tx += t;
                    case _: args = args.append(a);
                            tx += "{{"+(i++)+"}}";
                    };
                Class<? extends Parser> pc = DataSource.parserDirectory.get(parser.toString());
                if (pc == null)
                    throw new Error("Unrecognized parser: "+parser);
                Parser p = pc.newInstance();
                p.initialize(#[]);
                Bag e = p.parse(tx);
                Tree res = Interpreter.reify(e.get(0),p.type());
                for ( int j = 0; j < i; j++ )
                    res = subst(new VariableLeaf("t_"+j),args.nth(j),res);
                return res;
            } catch (Exception e) {
                throw new Error("Wrong template: "+s+"\n"+e);
            }
        };
        throw new Error("Wrong template: "+s);
    }

    /** convert Tree constructions to code that construct these Trees (used in the Compiler) */
    public static String reify ( Tree e ) {
        if (e instanceof LongLeaf)
            return "new org.apache.mrql.gen.LongLeaf(" + e + ")";
        else if (e instanceof DoubleLeaf)
            return "new org.apache.mrql.gen.DoubleLeaf(" + e + ")";
        else if (e instanceof VariableLeaf)
            return "new org.apache.mrql.gen.VariableLeaf(\"" + e.variableValue() + "\")";
        else if (e instanceof StringLeaf)
            return "new org.apache.mrql.gen.StringLeaf(" + e.toString().replace("\\","\\\\") + ")";
        else {
            Node n = (Node) e;
            return "new org.apache.mrql.gen.Node(\""+n.name()+"\","+reify(n.children())+")";
        }
    }

    /** convert Tree constructions to code that construct these Trees (used in the Compiler) */
    public static String reify ( Trees ts ) {
        String s = "org.apache.mrql.gen.Trees.nil";
        for ( Tree c: ts )
            s += ".append("+reify(c)+")";
        return s;
    }

    /** return the list of free variables in e that are not in exclude list */
    public static Trees free_variables ( Tree e, Trees exclude ) {
        if (e == null)
            return #[];
        match e {
        case lambda(`x,`b):
            return free_variables(b,exclude.append(pattern_variables(x)));
        case let(`x,`u,`b):
            return free_variables(b,exclude.append(pattern_variables(x)))
                     .append(free_variables(u,exclude));
        case Let(`x,`u,`b):
            return free_variables(b,exclude.append(pattern_variables(x)))
                     .append(free_variables(u,exclude));
        case select(`u,from(...bs),`p):
            Trees ex = exclude;
            Trees fs = #[];
            for ( Tree b: bs )
                match b {
                case bind(`v,`x):
                    fs = fs.append(free_variables(x,ex));
                    ex = ex.append(pattern_variables(v));
                };
            return free_variables(p,ex).append(free_variables(u,ex)).append(fs);
        case `f(...as):
            Trees res = #[];
            for ( Tree a: as )
                res = res.append(free_variables(a,exclude));
            return res;
        case `v:
            if (v.is_variable() && v.toString().startsWith("x_") && !exclude.member(v))
                return #[`v];
        };
        return #[];
    }

    /** count the occurences of x in e */
    public static int occurences ( Tree x, Tree e ) {
        if (x.equals(e))
            return 1;
        match e {
        case `f(...as):
            int i = 0;
            for ( Tree a: as )
                i += occurences(x,a);
            return i;
        };
        return 0;
    }

    /** return true if x is equal to y modulo variable substitution */
    public static boolean alpha_equivalent ( Tree x, Tree y, SymbolTable st ) {
        match #<T(`x,`y)> {
        case T(lambda(`vx,`bx),lambda(`vy,`by)):
            if (!vx.equals(vy))
                st.insert(vx.toString(),vy);
            return alpha_equivalent(bx,by,st);
        case T(`f(...xs),`g(...ys)):
            if (!f.equals(g) || xs.length() != ys.length())
                return false;
            for ( ; !xs.is_empty(); xs = xs.tail(), ys = ys.tail() )
                if (!alpha_equivalent(xs.head(),ys.head(),st))
                    return false;
            return true;
        case T(`v,`w):
            if (v.is_variable() && w.is_variable())
                return v.equals(w) || (st.lookup(v.toString()) != null
                                       && st.lookup(v.toString()).equals(w));
        };
        return x.equals(y);
    }

    private static SymbolTable alpha_symbol_table = new SymbolTable();

    /** return true if x is equal to y modulo variable substitution */
    public static boolean alpha_equivalent ( Tree x, Tree y ) {
        alpha_symbol_table.begin_scope();
        boolean b = alpha_equivalent(x,y,alpha_symbol_table);
        alpha_symbol_table.end_scope();
        return b;
    }

    /** translate a simplified select MRQL query to an algebraic form */
    public static Tree translate_select ( Tree e ) {
       match e {
       case select(`u,from(),where(true)):
           return #<bag(`(translate_select(u)))>;
       case select(`u,from(),where(`c)):
           return #<if(`(translate_select(c)),bag(`(translate_select(u))),bag())>;
       case select(`u,from(bind(`v,`d),...bl),where(`c)):
           Tree n = translate_select(#<select(`u,from(...bl),where(`c))>);
           return #<cmap(lambda(`v,`n),`(translate_select(d)))>;
       case `f(...al):
           Trees bl = #[];
           for ( Tree a: al )
               bl = bl.append(translate_select(a));
           return #<`f(...bl)>;
       };
       return e;
    }

    /** the MRQL top-level interfacse to evaluate a single MRQL expression or command */
    public static void top_level ( Tree expr ) {
        TopLevel.evaluate_top_level(expr);
    }
}
