/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.mrql;

import org.apache.mrql.gen.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.io.PrintStream;


/** The MRQL interpreter */
public class Interpreter extends TypeInference {

    public final static Tree identity_mapper = #<lambda(x,bag(x))>;
    public static boolean debug_mode = false;

    protected static Environment global_env = null;

    /** retrieve variable binding */
    public final static MRData variable_lookup ( final String v, final Environment environment ) {
        for ( Environment env = environment; env != null; env = env.next ) {
            if (v.equals(env.name))
                return env.value;
        };
        return null;
    }

    /** insert a new global variable binding */
    public final static void new_global_binding ( final String var, final MRData value ) {
        if (value instanceof Bag)
            ((Bag)value).materialize();
        global_env = new Environment(var,value,global_env);
    }

    /** remove a global variable binding */
    public static void remove_global_binding ( String v ) {
        if (global_env == null)
            return;
        for ( Environment env = global_env; env.next != null; env = env.next )
            if (v.equals(env.next.name))
                env.next = env.next.next;
        if (global_env.name == v)
            global_env = global_env.next;
    }

    /** retrieve a global variable binding */
    public static MRData lookup_global_binding ( String v ) {
        for ( Environment env = global_env; env != null; env = env.next )
            if (v.equals(env.name) && env.value != null)
                return env.value;
        return lookup_distributed_binding(v);
    }

    public static void set_global_bindings ( Environment env ) {
        global_env = env;
    }

    /** insert a new global variable binding visible to all nodes */
    public final static void new_distributed_binding ( final String var, final MRData value ) {
        if (value == null)
            return;
        new_global_binding(var,value);
        if (Config.hadoop_mode) {   // insert the binding in the hadoop configuration
            String gvs = Plan.conf.get("mrql.global.vars");
            if (gvs == null)
                gvs = "";
            gvs = var+":"+reify(value)+"@"+gvs;
            Plan.conf.set("mrql.global.vars",gvs);
        }
    }

    /** retrieve a global variable binding available to all nodes */
    public static MRData lookup_distributed_binding ( String v ) {
        try {
            if (Config.hadoop_mode) {   // find the binding in the hadoop configuration
                String gvs = Plan.conf.get("mrql.global.vars");
                if (gvs == null)
                    return null;
                for ( String s: gvs.split("@") ) {
                    String[] b = s.split(":");
                    if (b.length == 2 && b[0].equals(v))
                        return evalE(Tree.parse(b[1]));
                }
            }
        } catch (Exception ex) {};
        return null;
    }

    final static int coerce_method = ClassImporter.find_method_number("coerce",#[any,int]);

    /** untyped reify: not type-correct but will not crash the run-time system */
    private final static Tree reify ( final MRData x ) {
        if (x instanceof Bag) {
            Bag b = (Bag)x;
            Trees as = #[];
            for ( MRData e: b)
                as = as.append(reify(e));
            return #<list(...as)>;
        } else if (x instanceof Tuple) {
            Tuple t = (Tuple)x;
            Trees as = #[];
            for ( short i = 0; i < t.size(); i++ )
                as = as.append(reify(t.get(i)));
            return #<tuple(...as)>;
        } else if (x instanceof Union) {
            Union u = (Union)x;
            return #<tagged_union(`(u.tag()),`(reify(u.value())))>;
        } else if (x instanceof MR_string)
            return new StringLeaf(((MR_string)x).get());
        else if (x instanceof MR_short)
            return #<callM(coerce,`coerce_method,`(((MR_short)x).get()),`(MRContainer.SHORT))>;
        else if (x instanceof MR_int)
            return #<`(((MR_int)x).get())>;
        else if (x instanceof MR_long)
            return #<callM(coerce,`coerce_method,`((int)((MR_long)x).get()),`(MRContainer.LONG))>;
        else if (x instanceof MR_float)
            return #<`(((MR_float)x).get())>;
        else if (x instanceof MR_double)
            return #<callM(coerce,`coerce_method,`((float)(((MR_double)x).get())),`(MRContainer.DOUBLE))>;
        else if (x instanceof MR_bool)
            return (((MR_bool)x).get()) ? #<true> : #<false>;
        else if (x instanceof MR_dataset) {
            Trees vs = #[ ];
            for ( MRData v: ((MR_dataset)x).dataset().take(Integer.MAX_VALUE))
                vs = vs.append(reify(v));
            return #<list(...vs)>;
        };
        throw new Error("wrong MRData: "+x);
    }

    private static boolean has_source ( Tree e ) {
        match e {
        case Generator(...): return true;
        case BinarySource(...): return true;
        case ParsedSource(...): return true;
        case `f(...as):
            for ( Tree a: as )
                if (has_source(a))
                    return true;
        };
        return false;
    }

    /** evaluate an MRQL function in memory */
    private final static Function evalf ( final String v,
                                          final Tree body,
                                          final Environment env ) {
        return new Function() {
            final public MRData eval ( final MRData x ) {
                return evalE(body,new Environment(v,x,env));
            }
        };
    }

    /** evaluate an MRQL function in memory */
    public final static Function evalF ( Tree fnc, Environment env ) {
        match fnc {
        case compiled(`ln,`lm,...vars):
            try {
                return Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString());
            } catch (Exception ex) {
                System.err.println("*** Unable to retrieve the compiled lambda: "+fnc);
                return ((Lambda) evalE(lm)).lambda();
            }
        case lambda(`v,`b):
            return evalf(v.toString(),b,env);
        case function(tuple(...params),`tp,`body):
            String[] as = new String[params.length()];
            int i = 0;
            for ( Tree param: params )
                match param {
                case `bind(`v,_):
                    as[i++] = v.toString();
                };
            return evalT(as,body,env);
        };
        throw new Error("Ill-formed lambda: "+fnc);
    }

    /** evaluate an MRQL function in memory */
    private final static Function evalT ( final String[] params,
                                          final Tree body,
                                          final Environment env ) {
        return new Function() {
            final public MRData eval ( final MRData x ) {
                Environment new_env = env;
                for ( int i = 0; i < params.length; i++ )
                    new_env = new Environment(params[i],((Tuple)x).get(i),new_env);
                return evalE(body,new_env);
            }
        };
    }

    final static String true_name = #<true>.toString();
    final static String false_name = #<false>.toString();
    final static String null_name = #<null>.toString();
    final static MRData null_value = new Tuple(0);
    final static MRData true_value = new MR_bool(true);
    final static MRData false_value = new MR_bool(false);

    static int tab_count = -3;

    static long trace_count = 0;

    public static String tabs ( int n ) {
        StringBuffer b = new StringBuffer();
        for ( int i = 0; i < n; i++)
            b.append(' ');
        return b.toString();
    }

    /** implements the DataSetCollect operation */
    public static Bag dataSetCollect ( String var ) {
        MRData val = lookup_global_binding(var);
        if (val instanceof Bag)
            return (Bag)val;
        else if (val instanceof MR_dataset)
            return new Bag(((MR_dataset)val).dataset().take(Integer.MAX_VALUE));
        throw new Error("Cannot collect the MRQL value into a bag: "+val);
    }

    public static long pre_trace ( String msg ) {
        tab_count += 3;
        trace_count++;
        System.out.println(tabs(tab_count)+"*** "+trace_count+": "+msg);
        return trace_count;
    }

    public static MRData trace ( long count, Tree type, MRData value ) {
        value.materializeAll();
        System.out.print(tabs(tab_count)+"--> "+count+": ");
        System.out.println(Printer.print(value,type));
        tab_count -= 3;
        return value;
    }

    /** evaluate an MRQL expression in memory and print tracing info */
    final static MRData evalE ( final Tree e, final Environment env ) {
        if (Config.trace_exp_execution) {
            tab_count += 3;
            System.out.println(tabs(tab_count)+print_query(e));
        };
        MRData res = evalEE(e,env);
        if (Config.trace_exp_execution) {
            System.out.println(tabs(tab_count)+"-> "+res);
            tab_count -= 3;
        };
        return res;
    }

    /** evaluate an MRQL expression in memory */
    private final static MRData evalEE ( final Tree e, final Environment env ) {
        try {
            if (e.is_variable()) {
                String v = e.toString();
                if (v == true_name)
                    return true_value;
                else if (v == false_name)
                    return false_value;
                else if (v == null_name)
                    return null_value;
                MRData x = variable_lookup(v,env);
                if (x != null)
                    return x;
                x = lookup_global_binding(v);
                if (x == null)
                    throw new Error("Variable "+v+" is not bound");
                return x;
            } else if (e.is_long())
                return new MR_int((int)e.longValue());
            else if (e.is_double())
                return new MR_double(e.doubleValue());
            else if (e.is_string())
                return new MR_string(e.stringValue());
        match e {
        case callM(and,_,`x,`y):  // lazy
            return (((MR_bool)evalE(x,env)).get()) ? evalE(y,env) : false_value;
        case callM(or,_,`x,`y):
            return (((MR_bool)evalE(x,env)).get()) ? true_value : evalE(y,env);
        case callM(coerce,_,`x,`n):
            if (!x.is_long() || n.longValue() != MRContainer.LONG)
                fail;
            return new MR_long(x.longValue());
        case callM(coerce,_,`x,`n):
            if (!x.is_double() || n.longValue() != MRContainer.FLOAT)
                fail;
            return new MR_float((float)x.doubleValue());
        case callM(`f,`n,...args):   // internal function call
            MRData[] as = new MRData[args.length()];
            for ( int i = 0; i < args.length(); i++ )
                as[i] = evalE(args.nth(i),env);
            return ClassImporter.call((int)n.longValue(),as);
        case compiled(`ln,_,...vars):
            return new Lambda(Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString()));
        case lambda(`v,`body):
            return new Lambda(evalf(v.toString(),body,env));
        case nth(`x,`n):
            return ((Tuple)evalE(x,env)).get((int)n.longValue());
        case setNth(`x,`n,`v,`ret):
            return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
        case materialize(`u):
            return MapReduceAlgebra.materialize(evalE(u,env));
        case let(`v,DataSetCollect(`s),`body):
            MRData x = evalE(s,env);
            if (x instanceof MR_dataset)
                x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
            else if (x instanceof Bag)
                ((Bag)x).materialize();
            new_distributed_binding(v.toString(),x);
            return evalE(body,new Environment(v.toString(),x,env));
        case let(`v,`u,`body):
            MRData x = evalE(u,env);
            if (x instanceof Bag)
                ((Bag)x).materialize();
            return evalE(body,new Environment(v.toString(),x,env));
        case cmap(`f,`s):
            return MapReduceAlgebra.cmap(evalF(f,env),(Bag)evalE(s,env));
        case filter(`p,`m,`s):
            return MapReduceAlgebra.filter(evalF(p,env),evalF(m,env),(Bag)evalE(s,env));
        case map(`m,`s):
            return MapReduceAlgebra.map(evalF(m,env),(Bag)evalE(s,env));
        case repeat(...):
            if (!Config.flink_mode || !Config.hadoop_mode || !has_source(e))
                fail;
            // send Flink repetitions to the Flink evaluator (they need Flink iterations)
            return Evaluator.evaluator.aggregate(#<0>,#<0>,e,env);
        case repeat(lambda(`v,`b),`s,`n):
            final String nm = v.toString();
            final Tree body = b;
            Function loop_fnc = new Function () {
                    public MRData eval ( MRData s ) {
                        new_distributed_binding(nm,s);
                        return evalE(body,new Environment(nm,s,env));
                    }; };
            MRData init = evalE(s,env);
            if (init instanceof MR_dataset) {
                Bag vs = new Bag();
                for ( MRData dv: ((MR_dataset)init).dataset().take(Integer.MAX_VALUE))
                    vs.add(dv);
                init = vs;
            };
            return MapReduceAlgebra.repeat(loop_fnc,(Bag)init,((MR_int)evalE(n,env)).get());
        case closure(lambda(`v,`b),`s,`n):
            final String nm = v.toString();
            final Tree body = b;
            Function loop_fnc = new Function () {
                    public MRData eval ( MRData s ) {
                        new_distributed_binding(nm,s);
                        return evalE(body,new Environment(nm,s,env));
                    }; };
            return MapReduceAlgebra.closure(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
        case loop(lambda(tuple(...vs),`b),`s,`n):
            final Trees nvs = vs;
            final Tree body = b;
            Function loop_fnc = new Function () {
                    public MRData eval ( MRData s ) {
                        Tuple t = (Tuple)s;
                        Environment nenv = env;
                        for ( int i = 0; i < nvs.length(); i++ ) {
                            new_distributed_binding(nvs.nth(i).toString(),t.get(i));
                            nenv = new Environment(nvs.nth(i).toString(),t.get(i),nenv);
                        };
                        return evalE(body,nenv);
                    }; };
            return MapReduceAlgebra.loop(loop_fnc,(Tuple)evalE(s,env),((MR_int)evalE(n,env)).get());
        case range(`min,`max):
            return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
                                              ((MR_long)evalE(max,env)).get());
        case trace(`msg,`tp,`x):
            String m = ((MR_string)evalE(msg,env)).get();
            return trace(pre_trace(m),tp,evalE(x,env));
        case call(`f,...args):
            Tuple t = new Tuple(args.length());
            int i = 0;
            for ( Tree a: args )
                t.set(i++,evalE(a,env));
            return evalF(f,env).eval(t);
        case tuple(`x,`y):
            return new Tuple(evalE(x,env),evalE(y,env));
        case tuple(`x,`y,`z):
            return new Tuple(evalE(x,env),evalE(y,env),evalE(z,env));
        case tuple(...el):
            Tuple t = new Tuple(el.length());
            int i = 0;
            for ( Tree a: el )
                t.set(i++,evalE(a,env));
            return t;
        case tagged_union(`n,`u):
            return new Union((byte)n.longValue(),evalE(u,env));
        case union_value(`x):
            return ((Union)evalE(x,env)).value();
        case union_tag(`x):
            return new MR_int(((Union)evalE(x,env)).tag());
        // used for shortcutting sync in bsp supersteps
        case BAG():
            return SystemFunctions.bsp_empty_bag;
        case TRUE():
            return SystemFunctions.bsp_true_value;
        case FALSE():
            return SystemFunctions.bsp_false_value;
        case `T(...el):
            if (!is_collection(T))
                fail;
            if (el.is_empty())
                return new Bag();
            Bag b = new Bag(el.length());
            for ( Tree a: el )
                b.add(evalE(a,env));
            return b;
        case if(`c,`x,`y):
            if (((MR_bool)evalE(c,env)).get())
                return evalE(x,env);
            else return evalE(y,env);
        case Collect(`s):
            try {
                if (Config.hadoop_mode)
                    return Plan.collect(Evaluator.evaluator.eval(s,env,"-"));
                Bag b = evalS(s,env);
                b.materialize();
                return b;
            } catch (Exception ex) { throw new Error(ex); }
        case DataSetCollect(`s):
            return dataSetCollect(s.toString());
        case dataset_size(`x):
            return new MR_long(Plan.size(Evaluator.evaluator.eval(x,env,"-")) / (1024*1024));
        case synchronize(`peer,`b):
            return Evaluator.evaluator.synchronize(((MR_string)evalE(peer,env)),(MR_bool)evalE(b,env));
        case distribute(`peer,`s):
            return Evaluator.evaluator.distribute(((MR_string)evalE(peer,env)),(Bag)evalE(s,env));
        case mapReduce(`m,`r,`s,_):
            return MapReduceAlgebra.mapReduce(evalF(m,env),
                                              evalF(r,env),
                                              (Bag)evalE(s,env));
        case mapReduce2(`mx,`my,`r,`x,`y,_):
            return MapReduceAlgebra.mapReduce2(
                                evalF(mx,env),
                                evalF(my,env),
                                evalF(r,env),
                                (Bag)evalE(x,env),
                                (Bag)evalE(y,env));
        case mapJoin(`kx,`ky,`r,`x,`y):
            return MapReduceAlgebra.mapJoin(
                                evalF(kx,env),
                                evalF(ky,env),
                                evalF(r,env),
                                (Bag)evalE(x,env),
                                (Bag)evalE(y,env));
        case crossProduct(`mx,`my,`r,`x,`y):
            return MapReduceAlgebra.crossProduct(
                              evalF(mx,env),
                              evalF(my,env),
                              evalF(r,env),
                              (Bag)evalE(x,env),
                              (Bag)evalE(y,env));
        case groupBy(`s):
            return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
        case orderBy(`s):
            return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
        case index(`x,`n):
            MRData xv = evalE(x,env);
            MRData nv = evalE(n,env);
            final int k = (int)((MR_int)nv).get();
            if (k < 0)
                throw new Error("Negative list index: "+k+" (in "+print_query(x)+")");
            if (xv instanceof MR_dataset) {
                DataSet ds = ((MR_dataset)xv).dataset();
                List<MRData> res = ds.take(k+1);
                if (k >= res.size())
                    throw new Error("List index out of bounds: "+k+" (in "+print_query(x)+" of size "+res.size()+")");
                return res.get(k);
            };
            Bag b = (Bag)xv;
            if (b.materialized() && k >= b.size())
                throw new Error("List index out of bounds: "+k+" (in "+print_query(x)+" of size "+b.size()+")");
            return b.get(k);
        case range(`x,`i,`j):
            MRData xv = evalE(x,env);
            MRData ni = evalE(i,env);
            MRData nj = evalE(j,env);
            int ki = (int)((MR_int)ni).get();
            int kj = (int)((MR_int)nj).get();
            if (ki < 0 || kj < ki)
                throw new Error("Wrong list range: ["+ki+","+kj+"] in "+print_query(x));
            Iterator<MRData> it = (xv instanceof MR_dataset)
                                   ? ((MR_dataset)xv).dataset().take(kj+1).iterator()
                                   : ((Bag)xv).iterator();
            Bag s = new Bag();
            for ( int n = 0; it.hasNext() && n < ki; n++ )
                it.next();
            for ( int n = ki; it.hasNext() && n <= kj; n++ )
                s.add(it.next());
            return s;
        case map_index(`x,`key):
            MRData xv = evalE(x,env);
            final MRData nk = evalE(key,env);
            if (xv instanceof MR_dataset) {
                xv = ((MR_dataset)xv).dataset().reduce(new Tuple(),new Function() {
                        public MRData eval ( MRData value ) {
                            Tuple p = (Tuple)value;
                            Tuple y = (Tuple)p.second();
                            return (y.first().equals(nk)) ? y.second() : p.first();
                        }
                    });
                if (xv instanceof Tuple && ((Tuple)xv).size() == 0)
                    throw new Error("Map key not found: "+nk);
                return xv;
            };
            return ((Bag)xv).map_find(nk);
        case aggregate(`acc,`zero,`s):
            return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
                                              (Bag)evalE(s,env));
        case Aggregate(`acc,`zero,`s):
            if (Config.hadoop_mode)
                return Evaluator.evaluator.aggregate(closure(acc,env),zero,s,env);
            else return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),evalM(s,env));
        case mergeGroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
            return MapReduceAlgebra.mergeGroupByJoin(evalF(kx,env),evalF(ky,env),evalF(gx,env),evalF(gy,env),
                                                     evalF(acc,env),evalE(zero,env),evalF(r,env),
                                                     (Bag)evalE(x,env),(Bag)evalE(y,env));
        case BSP(tuple(...ns),`superstep,`state,`o,...as):
            if (Config.hadoop_mode)
                return Evaluator.evaluator.bsp(e,env);
            Bag[] ds = new Bag[as.length()];
            for ( int i = 0; i < ds.length; i++ )
                ds[i] = evalM(as.nth(i),env);
            int[] nn = new int[ns.length()];
            for ( int i = 0; i < ns.length(); i++ )
                nn[i] = (int)((LongLeaf)ns.nth(i)).value();
            return MapReduceAlgebra.BSP(nn,
                                        evalF(superstep,env),
                                        evalE(state,env),
                                        o.equals(#<true>),
                                        ds);
        case BSP(`n,`superstep,`state,`o,...as):
            if (Config.hadoop_mode)
                return Evaluator.evaluator.bsp(e,env);
            Bag[] ds = new Bag[as.length()];
            for ( int i = 0; i < ds.length; i++ )
                ds[i] = evalM(as.nth(i),env);
            return MapReduceAlgebra.BSP(new int[]{(int)((LongLeaf)n).value()},
                                        evalF(superstep,env),
                                        evalE(state,env),
                                        o.equals(#<true>),
                                        ds);
        case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
            if (Config.hadoop_mode)
                return Evaluator.evaluator.loop(e,env);
            int limit = ((MR_int)evalE(num,env)).get();
            Bag[] s = new Bag[vs.length()];
            for ( int i = 0; i < vs.length(); i++ )
                s[i] = evalM(ss.nth(i),env);
            for ( int n = 0; n < limit; n++ ) {
                Environment nenv = env;
                for ( int i = 0; i < vs.length(); i ++ ) {
                    s[i].materialize();
                    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
                };
                for ( int i = 0; i < vs.length(); i ++ )
                    s[i] = (Bag)evalM(bs.nth(i),nenv);
            };
            return new Tuple(s);
        case function(tuple(...params),`tp,`body):
            String[] as = new String[params.length()];
            int i = 0;
            for ( Tree param: params )
                match param {
                case `bind(`v,_):
                    as[i++] = v.toString();
                };
            return new Lambda(evalT(as,body,env));
        case typed(`x,_):
            return evalE(x,env);
        case apply(`f,`arg):
            if (!f.is_variable())
                return evalF(f,env).eval(evalE(arg,env));
            MRData fnc = lookup_global_binding(f.toString());
            if (fnc == null) {
                String s = Plan.conf.get("mrql.global."+f);
                if (s != null)
                    try {
                        Tree ft = Tree.parse(s);
                        //TopLevel.store(f.toString(),ft);
                        fnc = evalE(ft,env);
                        new_global_binding(f.toString(),fnc);
                    } catch (Exception ex) {
                        throw new Error(ex);
                    }
            };
            MRData t = evalE(arg,env);
            if (!(t instanceof Tuple))
                throw new Error("Expected a tuple in function application: "+t);
            return ((Lambda)fnc).lambda().eval(t);
        case Stream(...):  // streaming
            final Tree qt = query_type;
            Evaluator.evaluator.streaming(e,env,null,new Function(){
                    public MRData eval ( final MRData data ) {
                        System.out.println(print(data,qt));
                        return data;
                    }
                });
            return new Bag();
        case provenance(`x,`tp,...s):
            MRData value = evalE(x,env);
            if (Config.debug)
                Debugger.debug(value,tp,#[...s]);
            else Provenance.display(value,tp,#[...s]);
            return new Tuple(0);
        case Provenance(`x,`tp,...s):
            MRData value = (Config.hadoop_mode)
                ? new MR_dataset(Evaluator.evaluator.eval(x,env,"-"))
                : evalS(x,env);
            if (Config.debug)
                Debugger.debug(value,tp,#[...s]);
            else Provenance.display(value,tp,#[...s]);
            return new Tuple(0);
        case Lineage(...as):
            return evalEE(#<tuple(...as)>,env);
        case Let(`v,`x,`y):
            if (Config.hadoop_mode)
                return evalEE(y,new Environment(v.toString(),
                                                new MR_dataset(Evaluator.evaluator.eval(x,env,"-")),
                                                env));
            Bag b = evalS(x,env);
            b.materialize();
            return evalEE(y,new Environment(v.toString(),b,env));
        case _:
            try {
                if (Config.hadoop_mode)
                    return new MR_dataset(Evaluator.evaluator.eval(e,env,"-"));
                else return evalS(e,env);
            } catch (Exception ex) { throw new Error(ex); }
        };
        throw new Error("Cannot evaluate the expression: "+e);
        } catch (Error msg) {
            if (!Config.trace)
                throw new Error(msg.getMessage());
            System.err.println(msg.getMessage());
            msg.printStackTrace();
            throw new Error("Evaluation error in: "+print_query(e));
        } catch (Exception ex) {
            if (Config.trace) {
                System.err.println(ex.getMessage());
                ex.printStackTrace();
            }
            throw new Error("Evaluation error in: "+print_query(e));
        }
    }

    /** evaluate an MRQL expression in memory */
    final static MRData evalE ( final Tree e ) {
        return evalE(e,null);
    }

    /** evaluate MRQL physical operators in memory (returns a Bag) */
    final static Bag evalS ( final Tree e, final Environment env ) {
        return evalM(e,env);
    }

    /** evaluate MRQL physical operators in memory (returns a Bag) */
    final static Bag evalM ( final Tree e, final Environment env ) {
        if (Config.trace_execution) {
            tab_count += 3;
            System.out.println(tabs(tab_count)+print_query(e));
        };
        Bag res = evalMM(e,env);
        if (Config.trace_execution) {
            System.out.println(tabs(tab_count)+"-> "+res);
            tab_count -= 3;
        };
        return res;
    }

    /** evaluate MRQL physical operators in memory (returns a Bag) */
    final static Bag evalMM ( final Tree e, final Environment env ) {
        try {
            match e {
            case cMap(`f,`s):
                return MapReduceAlgebra.cmap(evalF(f,env),evalM(s,env));
            case AggregateMap(`f,`acc,`zero,`s):
                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
                                                          evalM(#<cMap(`f,`s)>,env)));
            case MapReduce(`m,`r,`s,_):
                return MapReduceAlgebra.mapReduce(
                                   evalF(m,env),
                                   evalF(r,env),
                                   evalM(s,env));
            case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
                                                          evalM(#<MapReduce(`m,`r,`s,false)>,env)));
            case MapCombineReduce(`m,`c,`r,`s,_):
                return MapReduceAlgebra.mapReduce(
                                   evalF(m,env),
                                   evalF(r,env),
                                   evalM(s,env));
            case MapReduce2(`mx,`my,`r,`x,`y,_):
                return MapReduceAlgebra.mapReduce2(
                                evalF(mx,env),
                                evalF(my,env),
                                evalF(r,env),
                                evalM(x,env),
                                evalM(y,env));
            case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_):
                return MapReduceAlgebra.mapReduce2(
                                evalF(mx,env),
                                evalF(my,env),
                                evalF(r,env),
                                evalM(x,env),
                                evalM(y,env));
            case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
                                                          evalM(#< MapReduce2(`mx,`my,`r,`x,`y,false)>,env)));
            case MapJoin(`kx,`ky,`r,`x,`y):
                return MapReduceAlgebra.mapJoin(
                                evalF(kx,env),
                                evalF(ky,env),
                                evalF(r,env),
                                evalM(x,env),
                                evalM(y,env));
            case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
                                                          evalM(#<MapJoin(`kx,`ky,`r,`x,`y)>,env)));
            case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
                return MapReduceAlgebra.groupByJoin(
                                evalF(kx,env),
                                evalF(ky,env),
                                evalF(gx,env),
                                evalF(gy,env),
                                evalF(acc,env),
                                evalE(zero,env),
                                evalF(r,env),
                                evalM(x,env),
                                evalM(y,env));
            case CrossProduct(`mx,`my,`r,`x,`y):
                return MapReduceAlgebra.crossProduct(
                                evalF(mx,env),
                                evalF(my,env),
                                evalF(r,env),
                                evalM(x,env),
                                evalM(y,env));
            case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
                                                          evalM(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)));
            case BinarySource(`file,_):
                return (Bag)MapReduceAlgebra.read_binary(file.stringValue());
            case BSPSource(`n,BinarySource(`file,_)):
                return (Bag)MapReduceAlgebra.read_binary((int)((LongLeaf)n).value(),
                                                         file.stringValue());
            case BSPSource(`n,ParsedSource(`parser,`file,...args)):
                if (!(n instanceof LongLeaf))
                    fail;
                Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
                if (p == null)
                    throw new Error("Unknown parser: "+parser);
                return MapReduceAlgebra.parsedSource((int)(((LongLeaf)n).value()),p,
                                                     ((MR_string)evalE(file,env)).get(),args);
            case ParsedSource(`parser,`file,...args):
                Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
                if (p == null)
                    throw new Error("Unknown parser: "+parser);
                return MapReduceAlgebra.parsedSource(p,((MR_string)evalE(file,env)).get(),args);
            case Merge(`x,`y):
                return evalM(x,env).union(evalM(y,env));
            case Repeat(lambda(`v,`b),`s,`n):
                final String vs = v.toString();
                final Tree body = b;
                Function loop = new Function() {
                        final public MRData eval ( final MRData x ) {
                            return evalM(body,new Environment(vs,x,env));
                        }
                    };
                return MapReduceAlgebra.repeat(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
            case Closure(lambda(`v,`b),`s,`n):
                final String vs = v.toString();
                final Tree body = b;
                Function loop = new Function() {
                        final public MRData eval ( final MRData x ) {
                            return evalM(body,new Environment(vs,x,env));
                        }
                    };
                return MapReduceAlgebra.closure(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
            case Generator(`min,`max,`size):
                return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
                                                  ((MR_long)evalE(max,env)).get());
            case BSPSource(`n,Generator(`min,`max,`size)):
                return MapReduceAlgebra.generator((int)((LongLeaf)n).value(),
                                                  ((MR_long)evalE(min,env)).get(),
                                                  ((MR_long)evalE(max,env)).get());
            case Dump(`s):
                Bag bs = (Bag)evalE(s,env);
                final Iterator<MRData> iter = bs.iterator();
                return new Bag(new BagIterator() {
                        public boolean hasNext () {
                            return iter.hasNext();
                        }
                        public MRData next () {
                            return new Tuple(new MR_int(0),iter.next());
                        }
                    });
            case let(`v,`u,`body):
                return evalM(body,new Environment(v.toString(),evalE(u,env),env));
            case materialize(`u):
                Bag res = evalM(u,env);
                res.materialize();
                return res;
            case trace(`msg,`tp,`x):
                String m = ((MR_string)evalE(msg,env)).get();
                return (Bag)trace(pre_trace(m),tp,evalM(x,env));
            case apply(`f,`arg):
                if (!f.is_variable())
                    return (Bag)evalF(f,env).eval(evalE(arg));
                MRData fnc = lookup_global_binding(f.toString());
                if (fnc == null)
                    throw new Error("Unknown function: "+f);
                MRData t = evalE(arg,env);
                if (!(t instanceof Tuple))
                    throw new Error("Expected a tuple in function application: "+t);
                return (Bag)((Lambda)fnc).lambda().eval(t);
            case BSPSource(`n,`s):
                final MR_int i = new MR_int((int)((LongLeaf)n).value());
                Bag bs = (Bag)evalE(s,env);
                final Iterator<MRData> iter = bs.iterator();
                return new Bag(new BagIterator() {
                        public boolean hasNext () {
                            return iter.hasNext();
                        }
                        public MRData next () {
                            return new Tuple(i,iter.next());
                        }
                    });
            case BSP(...):
                MRData res = evalE(e,env);
                if (res instanceof Bag)
                    return (Bag)res;
                else return new Bag(res);
            case `v:
                if (!v.is_variable())
                    fail;
                MRData x = variable_lookup(v.toString(),env);
                if (x != null)
                    return (Bag)x;
                x = lookup_global_binding(v.toString());
                if (x != null)
                    return (Bag)x;
                throw new Error("Variable "+v+" is not bound");
            };
            throw new Error("Cannot evaluate the plan: "+e);
        } catch (Error msg) {
            if (!Config.trace)
                throw new Error(msg.getMessage());
            System.err.println(msg.getMessage());
            msg.printStackTrace();
            throw new Error("Evaluation error in: "+print_query(e));
        } catch (Exception ex) {
            if (Config.trace)
                ex.printStackTrace();
            throw new Error("Evaluation error in: "+print_query(e));
        }
    }

    private final static Tree closure ( Tree e, Environment env, Trees local_vars ) {
        return e;
    }

    final static Tree closure ( Tree e, Environment env ) {
        return closure(e,env,#[]);
    }

    static Tree query_type;
    static Tree query_plan;
    static boolean is_dataset;

    /** translate an MRQL expression e into a physical plan */
    final static Tree translate_expression ( Tree e ) {
        try {
            if (Config.trace)
                System.out.println("Query at line "+Main.parser.line_pos()+": "+print_query(e));
            Tree qt = TypeInference.type_inference(e);
            if (!Config.quiet_execution)
                System.out.println("Query type: "+print_type(qt));
            query_type = qt;
            Tree ne = Normalization.remove_groupby(e);
            if (Config.trace)
                System.out.println("After removing group-by:\n"+ne.pretty(0));
            ne = Simplification.rename(ne);
            if (Config.trace)
                System.out.println("After renaming variables:\n"+ne.pretty(0));
            ne = Simplification.rename(Normalization.normalize_all(ne));
            if (Config.trace)
                System.out.println("Normalized query:\n"+ne.pretty(0));
            type_inference(ne);
            ne = QueryPlan.best_plan(ne);
            if (Config.trace)
                System.out.println("Best plan:\n"+ne.pretty(0));
            ne = Simplification.rename(Translator.translate_select(ne));
            if (Config.trace)
                System.out.println("After removing select-queries:\n"+ne.pretty(0));
            type_inference(ne);
            ne = Simplification.simplify_all(ne);
            if (Config.trace)
                System.out.println("Algebra expression:\n"+ne.pretty(0));
            Tree pt = type_inference(ne);
            if (Config.trace)
                System.out.println("Algebraic type: "+print_type(pt));
            if (Config.stream_window > 0 && Config.incremental)
                ne = Streaming.generate_incremental_code(ne);
            else if (Config.lineage) {
                ne = Provenance.embed_provenance(ne,false);
                if (Config.trace)
                    System.out.println("After provenance injection:\n"+ne.pretty(0));
            } else if (Config.debug) {
                ne = Provenance.embed_provenance(ne,debug_mode);
                if (Config.trace)
                    System.out.println("After provenance injection:\n"+ne.pretty(0));
            };
            ne = AlgebraicOptimization.translate_all(ne);
            if (Config.trace)
                System.out.println("Translated expression:\n"+ne.pretty(0));
            Tree et = TypeInference.type_inference(ne);
            is_dataset = PlanGeneration.is_dataset_expr(ne);
            if (Config.trace)
                System.out.println("Physical plan type: "+print_type(et));
            repeat_variables = #[];
            ne = Simplification.simplify_all(ne);
            if (Streaming.is_streaming(ne) && !Config.incremental)
                ne = Streaming.streamify(ne);
            Tree plan = PlanGeneration.makePlan(ne);
            if (Config.bsp_mode) {
                BSPTranslator.reset();
                if (Config.trace)
                    System.out.println("Physical plan:\n"+plan.pretty(0));
                plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
                if (Config.trace)
                    System.out.println("BSP plan:\n"+plan.pretty(0));
                else {
                    String splan = print_plan(plan,0,false);
                    if (!splan.equals("") && !Config.quiet_execution)
                        System.out.println("BSP plan:\n"+splan);
                }
            } else {
                if (Config.hadoop_mode)
                    plan = PlanGeneration.physical_plan(plan);
                plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
                if (Config.trace)
                    System.out.println("Physical plan:\n"+plan.pretty(0));
                else {
                    String splan = print_plan(plan,0,false);
                    if (!splan.equals("") && !Config.quiet_execution)
                        System.out.println("Physical plan:\n"+splan);
                }
            };
            if (Config.compile_functional_arguments)
                plan = Compiler.compile(plan);
            return plan;
        } catch (Error x) {
            if (Config.testing)
                throw new Error(x);
            if (!Config.trace && x.toString().endsWith("Type Error"))
                return null;
            if (x.getMessage() != null) // system error
                System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
            if (Config.trace)
                x.printStackTrace(System.err);
            return null;
        }
    }
}

