/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.mrql;

import org.apache.mrql.gen.*;
import java.util.*;


/** the type inference/checker for MRQL expressions and algebraic forms */
public class TypeInference extends Translator {

    private static Tree make_tuple ( Trees pl ) {
        if (pl.length() == 1)
            return pl.head();
        return #<tuple(...pl)>;
    }

    public static Tree make_persistent_type ( Tree tp ) {
        match tp {
        case `T(`t):
            if (!is_collection(T))
                fail;
            String S = persistent_collection(T);
            return #<`S(`t)>;
        };
        return tp;
    }

    private static String max_collection ( String x, String y ) {
        boolean is_bag = x.equals("Bag") || y.equals("Bag") || x.equals("bag") || y.equals("bag");
        boolean is_persistent = x.equals("Bag") || y.equals("Bag") || x.equals("List") || y.equals("List");
        return (is_bag) ? ((is_persistent) ? "Bag" : "bag") : ((is_persistent) ? "List" : "list");
    }

    private static boolean numerical ( String tp ) {
        return #[short,int,long,float,double].member(#<`tp>);
    }

    static void type_error ( Tree e, String msg ) {
        System.err.println("*** Type error (line: "+e.line+", position: "+e.position+")");
        if (Config.trace && type_env.iterator().hasNext()) {
            msg += "\nVariable Types:";
            for ( String var: type_env )
                msg += "\n "+var + ": " + print_type(type_env.lookup(var));
        };
        System.err.println("*** "+msg);
        throw new Error("Type Error");
    }

    /** given that pattern has type tp, bind the pattern variables to types */
    private static Trees bind_pattern_type ( Tree pattern, Tree tp ) {
        Trees args = #[];
        match pattern {
        case tuple(...pl):
            match tp {
            case tuple(...tl):
                if (tl.length() != pl.length())
                    type_error(pattern,"Tuple pattern "+print_query(pattern)+" must have exactly "
                               +tl.length()+" components");
                int i = 0;
                for ( Tree p: pl )
                    args = args.append(bind_pattern_type(p,tl.nth(i++)));
            case `etp: type_error(pattern,"Wrong pattern: found "+print_query(pattern)
                                  +" but expected a pattern that matches the type "+print_type(etp));
            };
        case record(...bl):
            Trees attrs = #[];
            match tp {
            case record(...tl):
                for ( Tree b: bl )
                    match b {
                    case bind(`n,`p):
                        boolean found = false;
                        if (attrs.member(n))
                            type_error(pattern,"Duplicate record attribute name: "+n
                                       +" in pattern "+print_query(pattern));
                        attrs = attrs.append(n);
                        for ( Tree t: tl )
                            match t {
                            case bind(`nt,`pt):
                                if (!nt.equals(n))
                                    fail;
                                found = true;
                                args = args.append(bind_pattern_type(p,pt));
                            };
                        if (!found)
                            type_error(pattern,"Wrong record component: "+n
                                       +" in pattern "+print_query(pattern)
                                       +" (expected one from "+print_type(tp)+")");
                    };
            case `etp: type_error(pattern,"Wrong pattern: found "+print_query(pattern)
                                  +" but expected a pattern that matches the type "+print_type(etp));
            };
        case typed(`p,`t):
            if (subtype(t,tp))
                args = bind_pattern_type(p,t);
            else type_error(pattern,"Type "+print_type(t)+" in pattern "+print_query(pattern)
                            +" does not match the expected type "+print_type(tp));
        case list(...pl):
            match tp {
            case list(`etp):
                for ( Tree p: pl )
                    args = args.append(bind_pattern_type(p,etp));
            case `stp: type_error(pattern,"List pattern "+print_query(pattern)
                                  +" can only be used for lists (found "+print_type(tp)+")");
            };
        case call(`c,...s):
            Tree ci = data_constructors.lookup(c.toString());
            if (ci == null)
                type_error(pattern,"Undefined data constructor "+c+" in pattern "+print_query(pattern));
            match ci {
            case `dname(`n,`dtp):
                if (!subtype(tp,expand(#<`dname>)))
                    type_error(pattern,"Cannot use the data constructor "+print_query(pattern)
                               +" in a pattern that expects type "+print_type(tp));
                args = args.append(bind_pattern_type(s.length() == 1 ? s.head() : #<tuple(...s)>,
                                                     dtp));
            };
        case any: ;
        case `v: 
            if (v.is_variable()) {
                args = args.append(v);
                type_env.insert(v.toString(),tp);
            } else if (!subtype(type_inference2(v),tp))
                type_error(pattern,"The constant "+v+" in pattern "
                           +print_query(pattern)+" is not of type "+print_type(tp));
        };
        return args;
    }

    private static short needs_coerce ( Tree x, Tree y ) {
        if (x.is_variable() && numerical(x.toString())
            && y.is_variable() && numerical(y.toString())) {
            short cx = MRContainer.type_code(x.toString());
            short cy = MRContainer.type_code(y.toString());
            return (cx == cy) ? -1 : (cx > cy) ? cx : cy;
        };
        return -1;
    }

    /** type equality in MRQL is structured equality, not named equality */
    public static boolean equal_types ( Tree tx, Tree ty ) {
        tx = expand(tx);
        ty = expand(ty);
        if (tx.equals(ty))
            return true;
        match tx {
        case `f(...xs):
            match ty {
            case `g(...ys):
                if (f.equals(g) && xs.length() == ys.length()) {
                    for ( ; !ys.is_empty(); xs = xs.tail(), ys = ys.tail() )
                        if (!equal_types(xs.head(),ys.head()))
                            return false;
                    return true;
                }
            }
        };
        return false;
    }

    /** is the collection type name S a subtype of that of T?
        List \lt Bag \lt bag and List \lt list \lt bag
     */
    public static boolean subtype ( String S, String T ) {
        return S.equals(T)
            || (S.equals("List") && T.equals("list"))
            || (S.equals("List") && T.equals("Bag"))
            || (S.equals("list") && T.equals("bag"))
            || (S.equals("Bag") && T.equals("bag"));
    }

    /** is the type tx a subtype of type ty? */
    public static boolean subtype ( Tree tx, Tree ty ) {
        tx = expand(tx);
        ty = expand(ty);
        if (ty.equals(#<any>))
            return true;
        if (equal_types(tx,ty))
            return true;
        if (tx.is_variable() && numerical(tx.toString())
            && ty.is_variable() && numerical(ty.toString()))
            return MRContainer.type_code(tx.toString()) <= MRContainer.type_code(ty.toString());
        match tx {
        case `T(`tex):
            if (!is_collection(T))
                fail;
            match ty {
                 case `S(`tey):
                     if (is_collection(S))
                         return subtype(T,S) && subtype(tex,tey);
                     else fail
            };
        case union(...):
            if (ty.equals(#<union>)) // used for XML functions
                return true;
            else fail
        case `f(...xs):
            match ty {
            case `g(...ys):
                if (f.equals(g))
                    return subtype(xs,ys);
            }
        };
        return false;
    }

    public static boolean subtype ( Trees ts1, Trees ts2 ) {
        if (ts1.length() != ts2.length())
            return false;
        for ( Trees s1 = ts1, s2 = ts2; !s1.is_empty(); s1=s1.tail(), s2=s2.tail() )
            if (!subtype(s1.head(),s2.head()))
                return false;
        return true;
    }

    public static int compare_types ( Tree t1, Tree t2 ) {
        if (t1.equals(#<any>) && t2.equals(#<any>))
            return 0;
        else if (t2.equals(#<any>))
            return -1;
        else if (t1.equals(#<any>))
            return 1;
        else if (t1 instanceof VariableLeaf && t2 instanceof VariableLeaf)
            return MRContainer.type_code(((VariableLeaf)t1).value())
                   -MRContainer.type_code(((VariableLeaf)t2).value());
        else if (t1 instanceof VariableLeaf)
            return -1;
        else return 1;
    }

    /** if the expression at loc.head has type tx that is a subtype of ty, coerce it to ty
     * NOTE: it destructively changes the expression at loc
     */
    private static void coerce ( Tree tx, Tree ty, Trees loc ) {
        tx = expand(tx);
        ty = expand(ty);
        if (ty.equals(#<any>) || equal_types(tx,ty))
            return;
        if (tx.is_variable() && numerical(tx.toString())
            && ty.is_variable() && numerical(ty.toString())) {
            loc.head = #<call(coerce,`(loc.head),            // destructive
                              `(MRContainer.type_code(ty.toString())))>;
            return;
        };
        match tx {
        case `T(`tex):
            if (!is_collection(T))
                fail;
            match ty {
                 case `S(`tey):
                     if (!is_collection(S))
                         fail;
                     if (is_persistent_collection(T) && !is_persistent_collection(S))
                         loc.head = #<Collect(`(loc.head))>;   // destructive
                     if (subtype(tex,tey) && unify(tex,tey) == null) {
                         Tree nv = new_var();
                         Tree b = #<bag(`nv)>;
                         coerce(tex,tey,((Node)b).children);
                         loc.head = #<cmap(lambda(`nv,`b),`(loc.head))>;   // destructive
                         return;
                     }
            };
        case tuple(...xs):
            match ty {
            case tuple(...ys):
                Tree nv = new_var();
                Trees nt = #[];
                int i = 0;
                for ( Trees xl = xs, yl = ys; xl != #[] && yl != #[]; xl = xl.tail, yl = yl.tail ) {
                    Trees dest = #[nth(`nv,`(i++))];
                    coerce(xl.head,yl.head,dest);
                    nt = nt.append(dest);
                };
                loc.head = #<let(`nv,`(loc.head),tuple(...nt))>;   // destructive
            }
        case record(...xs):
            match ty {
            case record(...ys):
                Tree nv = new_var();
                Trees nt = #[];
                for ( Tree x: xs )
                    match x {
                    case bind(`ax,`tex):
                        for ( Tree y: ys )
                            match y {
                            case bind(`ay,`tey):
                                if (equal_types(ax,ay)) {
                                    Tree b = #<bind(`ax,project(`nv,`ax))>;
                                    nt = nt.append(b);
                                    coerce(tex,tey,((Node)b).children.tail);
                                }
                            }
                    };
                loc.head = #<let(`nv,`(loc.head),record(...nt))>;   // destructive
            }
        }
    }   

    /** poor-man's type-unification (the any type unifies with everything) */
    private static Trees unify ( Trees ts1, Trees ts2 ) {
        Trees s = #[];
        if (ts1.length() != ts2.length())
            return null;
        for ( Trees s1 = ts1, s2 = ts2; !s1.is_empty(); s1=s1.tail(), s2=s2.tail() ) {
            Tree t = unify(s1.head(),s2.head());
            if (t == null)
                return null;
            else s = s.append(t);
        };
        return s;
    }

    /** poor-man's type-unification (the any type unifies with everything) */
    public static Tree unify ( Tree t1, Tree t2 ) {
        t1 = expand(t1);
        t2 = expand(t2);
        if (t1.equals(#<any>))
            return t2;
        if (t2.equals(#<any>) || equal_types(t1,t2))
            return t1;
        match t1 {
        case `T(`t):
            match t2 {
            case `S(`s):
                if (!T.equals(S))
                    fail;
                if (!is_collection(T))
                    fail;
                Tree ts = unify(t,s);
                if (ts != null)
                    return #<`T(`ts)>;
            }
        case `f(...ts1):
            match t2 {
            case `g(...ts2):
                Trees s = unify(ts1,ts2);
                if (f.equals(g) && s != null)
                    return #<`f(...s)>;
            }
        };
        return null;
    }

    /** similar to unify, but it returns the second type instantiated */
    private static Trees subtype_unify ( Trees ts1, Trees ts2 ) {
        Trees s = #[];
        if (ts1.length() != ts2.length())
            return null;
        for ( Trees s1 = ts1, s2 = ts2; !s1.is_empty(); s1=s1.tail(), s2=s2.tail() ) {
            Tree t = subtype_unify(s1.head(),s2.head());
            if (t == null)
                return null;
            else s = s.append(t);
        };
        return s;
    }

    /** similar to unify, but it returns the second type instantiated */
    public static Tree subtype_unify ( Tree t1, Tree t2 ) {
        t1 = expand(t1);
        t2 = expand(t2);
        if (t1.equals(#<any>))
            return t2;
        if (t2.equals(#<any>) || equal_types(t1,t2))
            return t1;
        match t1 {
        case `T(`t):
            match t2 {
            case `S(`s):
                if (!is_collection(T))
                    fail;
                Tree ts = subtype_unify(t,s);
                if (ts != null)
                    return #<`T(`ts)>;
            }
        case `f(...ts1):
            match t2 {
            case `g(...ts2):
                Trees s = subtype_unify(ts1,ts2);
                if (f.equals(g) && s != null)
                    return #<`f(...s)>;
            }
        };
        return null;
    }

    /** if the types tx and ty do not unify, try to coerce them */
    static Tree unify ( Tree tx, Tree ty, Trees destx, Trees desty ) {
        Tree tp = unify(tx,ty);
        if (tp != null)
            return tp;
        else if (subtype(tx,ty)) {
            coerce(tx,ty,destx);
            return subtype_unify(tx,ty);
        } else if (subtype(ty,tx)) {
            coerce(ty,tx,desty);
            return subtype_unify(ty,tx);
        } else return null;
    }

    /** find a type from the list of types ts that is the supertype of all other types */
    private static Tree maximum_type ( Trees ts ) {
        Tree maxt = ts.head;
        for ( Tree t: ts.tail )
            if (subtype(maxt,t) || maxt.equals(#<any>))
                maxt = t;
        return maxt;
    }

    /** if the type tp is a named type, expand it using its definition */
    public static Tree expand ( Tree tp ) {
        if (!tp.is_variable())
            return tp;
        Tree rt = global_datatype_env.lookup(tp.toString());
        if (rt != null)
            return expand(rt);
        rt = type_names.lookup(tp.toString());
        if (rt == null)
            return tp;
        else return expand(rt);
    }

    /** infer the type of an expression and expand it if necessary
     * @param e the expression
     * @return the type of e
     */
    public static Tree type_inference2 ( Tree e ) {
        return expand(type_inference(e));
    }

    /** infer the type of an expression
     * @param e the expression
     * @return the type of e
     */
    public static Tree type_inference ( Tree e ) {
        match e {
        case select(`opt_dist,`u,from(...bl),where(`c),groupby(...gs),orderby(...os)):
            type_env.begin_scope();
            Trees dvs = #[];
            String max_type = "list";
            for ( Tree b: bl )
                match b {
                case bind(`p,`d):
                    match type_inference2(d) {
                    case `T(`tp):
                        if (!is_collection(T))
                            fail;
                        dvs = dvs.append(bind_pattern_type(p,tp));
                        max_type = max_collection(T,max_type);
                    case `ftp:
                        type_error(e,"The from-binding domain "+print_query(d)+" in "
                                   +print_query(e)+" must be a collection (found "
                                   +print_type(ftp)+")");
                    }
                };
            Tree ctp = type_inference2(c);
            if (unify(ctp,#<bool>) == null)
                type_error(e,"The predicate "+print_query(c)+" in "+print_query(e)
                           +" must be a boolean (found "+print_type(ctp)+")");
            match #<groupby(...gs)> {
            case groupby(`h,...gl):
                Trees pvs = #[];
                for ( Tree g: gl )
                    match g {
                    case bind(`gp,`gd):
                        bind_pattern_type(gp,type_inference2(gd));
                        pvs = pvs.append(pattern_variables(gp));
                    };
                // find the type of the partition variable
                Trees partl = #[];
                for ( Tree dv: pattern_variables(#<tuple(...dvs)>) )
                    partl = partl.append(#<bind(`dv,`(type_env.lookup(dv.toString())))>);
                type_env.insert("partition",#<bag(record(...partl))>);
                // lift domain variables to bags
                for ( Tree dv: dvs )
                    if (!pvs.member(dv))
                        type_env.insert(dv.toString(),
                                        #<bag(`(type_env.lookup(dv.toString())))>);
                Tree htp = type_inference2(h);
                if (unify(htp,#<bool>) == null)
                    type_error(e,"The group-by predicate "+print_query(h)+" in "+print_query(e)
                               +" must be a boolean (found "+print_type(htp)+")");
            };
            match #<orderby(...os)> {
            case orderby(`l,...ol):
                if (!l.equals(#<none>)) {
                    Tree ltp = type_inference2(l);
                    if (unify(ltp,#<int>) == null)
                        type_error(e,"The limit "+print_query(l)+" in "+print_query(e)
                                   +" must be an int (found "+print_type(ltp)+")");
                };
                for ( Tree o: ol)
                    type_inference2(o);
                Tree rtp = type_inference2(u);
                type_env.end_scope();
                return (is_persistent_collection(max_type)) ? #<List(`rtp)> : #<list(`rtp)>;
            };
            Tree rtp = type_inference2(u);
            type_env.end_scope();
            return (is_persistent_collection(max_type)) ? #<Bag(`rtp)> : #<bag(`rtp)>;
        case select(`u,from(...bl),where(`c)):
            String max_type = "list";
            for ( Tree b: bl )
                match b {
                case bind(`v,`d):
                    match type_inference2(d) {
                    case `T(`tp):
                        if (!is_collection(T))
                            fail;
                        type_env.insert(v.toString(),tp);
                        max_type = max_collection(T,max_type);
                    case _: type_error(e,"Expected a collection: "+print_query(d)
                                       +" in "+print_query(e));
                    }
                };
            if (unify(type_inference2(c),#<bool>) != null) {
                Tree tp = type_inference(u);
                return (is_persistent_collection(max_type)) ? #<Bag(`tp)> : #<bag(`tp)>;
            } else type_error(e,"The select predicate must be boolean: "+print_query(e));
        case let_bind(`p,`u,`b):
            type_env.begin_scope();
            bind_pattern_type(p,type_inference2(u));
            Tree tp = type_inference2(b);
            type_env.end_scope();
            return tp;
        case let(`p,`u,`b):
            bind_pattern_type(p,type_inference2(u));
            return type_inference2(b);
        case Let(`p,`u,`b):
            bind_pattern_type(p,type_inference2(u));
            return type_inference2(b);
        case case(`u,...cs):
            Tree tp = type_inference2(u);
            Trees ts = #[];
            for ( Tree c: cs )
                match c {
                case case(`p,`b):
                    type_env.begin_scope();
                    bind_pattern_type(p,tp);
                    ts = ts.append(type_inference2(b));
                    type_env.end_scope();
                };
            Tree maxt = maximum_type(ts);
            for ( ; cs != #[] && ts != #[]; cs = cs.tail, ts = ts.tail )
                match cs.head {
                case case(`p,`b):
                    if (subtype(ts.head,maxt))
                        coerce(ts.head,maxt,((Node)(cs.head)).children.tail);
                    else type_error(cs.head,"Mismatched case output: "+b);
                };
            return maxt;
        case lambda(`v,`b):
            if (!v.is_variable()) {
                Tree nv = new_var();
                type_env.insert(nv.toString(),type_inference(v));
                st.begin_scope();
                if (!Normalization.bind_pattern(v,nv).is_empty())
                    error("Lambda patterns must be irrefutable: "+print_query(e));
                for ( String nm: st )
                    if (st.is_local(nm))
                        b = Simplification.subst(#<`nm>,st.lookup(nm),b);
                st.end_scope();
                return type_inference(#<lambda(`nv,`b)>);
            } else if (type_env.lookup(v.toString()) == null)
                type_env.insert(v.toString(),#<any>);
            return #<arrow(`(type_env.lookup(v.toString())),`(type_inference(b)))>;
        case function(tuple(...params),`outp,`body):
            Trees as = #[];
            for ( Tree param: params )
                match param {
                case `bind(`v,`tp):
                    type_env.insert(v.toString(),tp);
                    as = as.append(tp);
                };
            Tree btp = type_inference(body);
            if (!subtype(btp,outp))
                type_error(e,"The type of the function body "+print_type(btp)
                           +" does not match the expected type "+print_type(outp)+"\n"+e);
            if (unify(btp,outp) == null) {  // the body needs coercion
                Trees b = #[`body];
                coerce(btp,outp,b);
                body = b.head;
            };
            return #<arrow(tuple(...as),`outp)>;
        case call(source,binary,`f,`tp):
            return tp;
        case call(source,binary,`f):
            if (!f.is_string())
                type_error(e,"The source file must be a constant string: "+print_query(f));
            Tree tp = null;
            if (Config.hadoop_mode)
                tp = Evaluator.evaluator.get_type(f.stringValue());
            else tp = MapReduceAlgebra.get_type(f.stringValue());
            if (tp == null)
                type_error(e,"Cannot find the type of file "+f);
            ((Node)e).children = ((Node)e).children.append(tp);       // destructive
            return tp;
        case call(source,`parser,`f,...args):
            if (!parser.is_variable())
                type_error(e,"The parser must be a constant name: "+print_query(parser));
            if (unify(type_inference(f),#<string>) == null)
                type_error(e,"The source file must be a string: "+print_query(f));
            try {
                Class<? extends Parser> pc = DataSource.parserDirectory.get(parser.toString());
                if (pc == null)
                    type_error(e,"Unrecognized parser: "+parser);
                Parser p = pc.newInstance();
                p.initialize(args);
                return #<Bag(`(p.type()))>;
            } catch (Exception x) {
                type_error(e,"Unrecognized parser type: "+parser);
            }
        case call(stream,binary,`f):
            if (Config.stream_window == 0)
                type_error(e,"Not in stream processing mode");
            Tree tp = type_inference(#<call(source,binary,`f)>);
            ((Node)e).children = ((Node)e).children.append(tp);       // destructive
            return tp;
        case call(stream,`parser,`f,...args):
            if (parser.equals(#<binary>))
                fail;
            if (Config.stream_window == 0)
                type_error(e,"Not in stream processing mode");
            if (!parser.is_variable())
                type_error(e,"The parser must be a constant name: "+print_query(parser));
            if (unify(type_inference(f),#<string>) == null)
                type_error(e,"The source file must be a string: "+print_query(f));
            if (unify(type_inference(args.head()),#<int>) != null)
                args = args.tail();
            try {
                Class<? extends Parser> pc = DataSource.parserDirectory.get(parser.toString());
                if (pc == null)
                    type_error(e,"Unrecognized parser: "+parser);
                Parser p = pc.newInstance();
                p.initialize(args);
                return #<Bag(`(p.type()))>;
            } catch (Exception x) {
                type_error(e,"Unrecognized parser type: "+parser);
            }
        case call(stream,...r):
            if (Config.stream_window == 0)
                type_error(e,"Not in stream processing mode");
            return type_inference(#<call(source,...r)>);
        case BinaryStream(`path,`tp):
            return tp;
        case ParsedStream(`parser,...r):
            return type_inference(#<call(stream,`parser,...r)>);
        case stream(lambda(`v,`b),`u):
            bind_pattern_type(v,type_inference2(u));
            return type_inference2(b);
        case Stream(lambda(`v,`b),`u):
            bind_pattern_type(v,type_inference2(u));
            return type_inference2(b);
        case typed(null,`tp):
            return tp;
        case typed(tagged_union(`n,`u),`tp):
            type_inference2(u);
            return tp;
        case typed(union_value(`u),`tp):
            type_inference2(u);
            return tp;
        case typed(`x,`tp):
            if (tp.is_variable() && !tp.equals(#<string>)
                 && MRContainer.type_code(tp.toString()) >= 0) {
                Tree tx = type_inference(x);
                if (tx.is_variable() && !tx.equals(#<string>)
                     && MRContainer.type_code(tx.toString()) >= 0)
                    return tp;
                else type_error(e,"Expression "+print_query(x)+" of type "+print_type(tx)
                                +" cannot be coerced to type "+tp);
            };
            Tree tx = type_inference(x);
            if (!subtype(tx,tp))
                type_error(e,"Expression "+print_query(x)+" of type "+print_type(tx)
                           +" cannot be coerced to type "+print_type(tp));
            if (unify(tx,tp) == null)   // need to coerce x
                coerce(tx,tp,((Node)e).children);
            return tp;
        case tuple(...el):
            Trees s = #[];
            for ( Tree a: el )
                s = s.append(type_inference(a));
            return #<tuple(...s)>;
        case call(coerce,`x,`n):
            return #<`(MRContainer.type_names[(int)n.longValue()])>;
        case record(...el):
            Trees s = #[];
            Trees attrs = #[];
            for ( Tree a: el )
                match a {
                case bind(`v,`b):
                    s = s.append(#<bind(`v,`(type_inference(b)))>);
                    if (attrs.member(v))
                        type_error(e,"Duplicate record attribute name: "+v+" in "+print_query(e));
                    attrs = attrs.append(v);
                };
            return #<record(...s)>;
        case union_tag(`x):
            return #<int>;
        case `T(...el):
            if (!is_collection(T))
                fail;
            if (el.is_empty())
                return #<`T(any)>;
            Trees ts = #[];
            for ( Tree t: el )
                ts = ts.append(type_inference(t));
            Tree maxt = maximum_type(ts);
            for ( ; el != #[] && ts != #[]; el = el.tail, ts = ts.tail )
                if (subtype(ts.head,maxt))
                    coerce(ts.head,maxt,el);
                else type_error(e,"Incompatible values in "+T+" construction: "+print_query(e));
            return #<`T(`maxt)>;
        case nth(`a,`n):
            if (!n.is_long())
                type_error(e,"Tuple index must be an integer: "+print_query(e));
            int i = (int)n.longValue();
            match type_inference2(a) {
            case tuple(...ts):
                if (i < 0 || i >= ts.length())
                    type_error(e,"Tuple index outside bounds: "+print_query(e));
                return ts.nth(i);
            case `S(tuple(...ts)):
                if (!is_collection(S))
                    fail;
                if (i < 0 || i >= ts.length())
                    type_error(e,"Tuple index outside bounds: "+print_query(e));
                return #<`S(`(ts.nth(i)))>;
            case `tp:
                type_error(e,"Tuple index must be applied to a tuple: "
                           +print_query(a)+" of type: "+print_type(tp)+" in "+print_query(e));
            };
        case project(`a,`v):      // highly overloaded
            Tree ta = type_inference(a);
            match ta {
            case XML:
                return #<list(XML)>;
            case `S(`tp):
                if (is_collection(S) && (tp.equals(#<XML>) || tp.equals(TopLevel.xml_type)))
                    return #<`S(XML)>;
            };
            if (ta.equals(TopLevel.xml_type))
                return #<list(XML)>;
            match expand(ta) {
            case record(...ts):
                for ( Tree t: ts )
                    match t {
                    case bind(`w,`tp):
                        if (equal_types(w,v))
                            return tp;
                    };
                type_error(e,"Record "+print_query(a)+" does not have a component "+v);
            case union(...tl):
                Trees s = #[];
                for ( Tree t: tl )
                    match t {
                    case `c(record(...ts)):
                        for ( Tree tn: ts )
                            match tn {
                            case bind(`w,`tp):
                                if (equal_types(w,v))
                                    s = s.append(tp);
                            };
                    case `c(bag(tuple(string,`tv))):
                        s = s.append(tv);
                    };
                if (s.length() == 0)
                    type_error(e,"Wrong record projection "+print_query(e)
                               + " over the union "+print_type(ta));
                else if (s.length() > 1)
                    type_error(e,"Ambiguous record projection "+print_query(e)
                               + " over the union "+print_type(ta));
                return s.head();
            case `S(`ttp):
                if (!is_collection(S))
                    fail;
                match expand(ttp) {
                case record(...ts):
                    for ( Tree t: ts )
                        match t {
                        case bind(`w,`tp):
                            if (equal_types(w,v))
                                return #<`S(`tp)>;
                        };
                    type_error(e,"The record collection "+print_query(a)
                               +" does not have a component "+v);
                case tuple(string,`tv):
                    return tv;
                case union(...tl):
                    Trees s = #[];
                    for ( Tree t: tl )
                        match t {
                        case `c(record(...ts)):
                            for ( Tree tn: ts )
                                match tn {
                                case bind(`w,`tp):
                                    if (equal_types(w,v))
                                        s = s.append(tp);
                                };
                        case `c(bag(tuple(string,`tv))):
                            s = s.append(tv);
                        };
                    if (s.length() == 0)
                        type_error(e,"Wrong record projection "+print_query(e)
                                   + " over the union "+print_type(ta));
                    else if (s.length() > 1)
                        type_error(e,"Ambiguous record projection "+print_query(e)
                                   + " over the union "+print_type(ta));
                    return #<`S(...s)>;
                };
            case `t:
                type_error(e,"The record projection "+print_query(e)
                           + " cannot apply to the type "+print_type(t));
            };
        case index(`a,`i):
            Tree ti = type_inference2(i);
            match type_inference2(a) {
            case bag(tuple(`tk,`tv)):
                if (subtype(ti,tk)) {
                    coerce(ti,tk,((Node)e).children.tail);
                    return tv;
                } else fail
            case Bag(tuple(`tk,`tv)):
                if (subtype(ti,tk)) {
                    coerce(ti,tk,((Node)e).children.tail);
                    return tv;
                } else fail
            case list(`tp):
                if (unify(ti,#<int>) != null)
                    return tp;
                else type_error(e,"List index must be an integer: "+print_query(e));
            case List(`tp):
                if (unify(ti,#<int>) != null)
                    return tp;
                else type_error(e,"List index must be an integer: "+print_query(e));
            case union(...tl):
                Trees s = #[];
                for ( Tree t: tl )
                    match expand(t) {
                    case `c(bag(tuple(`tk,`tv))):
                        if (unify(ti,tk) != null)
                            s = s.append(tv);
                        else fail
                    case `c(list(`tp)):
                        if (unify(ti,#<int>) != null)
                            s = s.append(tp);
                        else fail
                    };
                if (s.length() == 0)
                    type_error(e,"Wrong indexing "+print_query(e)
                               + " in union "+print_type(#<union(...tl)>));
                else if (s.length() > 1)
                    type_error(e,"Ambiguous indexing "+print_query(e)
                               + " in union "+print_type(#<union(...tl)>));
                return s.head();
            case `tp: type_error(e,"Indexing is not allowed for type "+print_type(tp)
                                 +" with index "+print_type(ti)+" in "+print_query(e));
            };
        case range(`u,`i,`j):
            if (unify(type_inference2(i),#<int>) == null
                || unify(type_inference2(j),#<int>) == null)
                type_error(e,"Range indexes must be integer expressions: "+print_query(e));
            match type_inference2(u) {
            case list(`a): return #<list(`a)>;
            case List(`a): return #<list(`a)>;
            };
            type_error(e,"Range indexing must be applied to a list: "+print_query(e));
        case range(`min,`max):
            Tree tmin = type_inference(min);
            Tree tmax = type_inference(max);
            if (!subtype(tmin,#<long>))
                type_error(e,"Expected a long integer for min: "+print_query(min));
            else if (unify(tmin,#<long>) == null)  // coerce
                coerce(tmin,#<long>,((Node)e).children);
            if (!subtype(tmax,#<long>))
                type_error(e,"Expected a long integer for max: "+print_query(max));
            else if (unify(tmax,#<long>) == null)  // coerce
                coerce(tmax,#<long>,((Node)e).children.tail);
            return #<list(long)>;
        case call(gen,`min,`max,`size):
            return type_inference(#<gen(`min,`max,`size)>);
        case gen(`min,`max,`size):
            Tree tmin = type_inference(min);
            Tree tmax = type_inference(max);
            Tree tsize = type_inference(size);
            if (!subtype(tmin,#<long>))
                type_error(e,"Expected a long integer for min: "+print_query(min));
            else if (unify(tmin,#<long>) == null)  // coerce
                coerce(tmin,#<long>,((Node)e).children);
            if (!subtype(tmax,#<long>))
                type_error(e,"Expected a long integer for max: "+print_query(max));
            else if (unify(tmax,#<long>) == null)  // coerce
                coerce(tmax,#<long>,((Node)e).children.tail);
            if (!subtype(tsize,#<long>))
                type_error(e,"Expected a long integer for size: "+print_query(size));
            else if (unify(tsize,#<long>) == null)  // coerce
                coerce(tsize,#<long>,((Node)e).children.tail.tail);
            return #<Bag(long)>;
        case dataset_size(`x):
            return #<long>;
        case groupBy(`u):
            Tree tp = type_inference2(u);
            match tp {
            case `T(tuple(`k,`v)):
                if (is_collection(T))
                    return (is_persistent_collection(T))
                            ? #<Bag(tuple(`k,list(`v)))>
                            : #<bag(tuple(`k,list(`v)))>;
            };
            type_error(e,"Wrong groupBy: "+print_query(e)+" of type "+print_type(tp));
        case coGroup(`x,`y):
            Tree xtp = type_inference2(x);
            Tree ytp = type_inference2(y);
            match xtp {
            case `T(tuple(`kx,`a)):
                if (!is_collection(T))
                    type_error(e,"Wrong coGroup left operand: "+print_query(x));
                match ytp {
                case `S(tuple(`ky,`b)):
                    if (!is_collection(S))
                        type_error(e,"Wrong coGroup right operand: "+print_query(y));
                    if (unify(kx,ky) == null)
                        type_error(e,"incompatible keys in coGroup: "+print_query(e));
                    return (is_persistent_collection(T) && is_persistent_collection(S))
                        ? #<Bag(tuple(`kx,tuple(`T(`a),`S(`b))))>
                        : #<bag(tuple(`kx,tuple(`T(`a),`S(`b))))>;
                }
            };
            type_error(e,"Wrong coGroup: "+print_query(e));
        case orderBy(`u):
            Tree tp = type_inference2(u);
            match tp {
            case `T(tuple(`k,`v)):
                if (is_collection(T))
                    return (is_persistent_collection(T))
                            ? #<List(tuple(`k,list(`v)))>
                            : #<list(tuple(`k,list(`v)))>;
            };
            type_error(e,"Wrong orderBy: "+print_query(e)+" of type "+print_type(tp));
        case cmap(lambda(`v,`body),`s):
            match type_inference2(s) {
            case `T(`a):
                if (!is_collection(T))
                    fail;
                type_env.insert(v.toString(),a);
                match type_inference2(body) {
                case `S(`tp):
                    if (!is_collection(S))
                        fail;
                    return #<`T(`tp)>;
                case _: type_error(e,"cmap must return a collection: "+print_query(e));
                };
            case `t: type_error(e,"cmap must be over a collection: "+print_query(e)
                                +"  (found type "+print_type(t)+")");
            };
            type_error(e,"Wrong cmap: "+print_query(e));
        case fold(lambda(`v,`body),`n,`s):
            match type_inference2(s) {
            case `T(`a):
                if (!is_collection(T))
                    fail;
                Tree tp = type_inference(n);
                type_env.insert(v.toString(),#<tuple(`a,tp)>);
                if (unify(type_inference2(body),tp) == null)
                    type_error(e,"Wrong types in fold: "+print_query(e));
                return tp;
            };
        case join(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`vr,`br),`x,`y):
            match type_inference2(x) {
            case `T(`a):
                if (!is_collection(T))
                    fail;
                match type_inference2(y) {
                case `S(`b):
                    if (!is_collection(S))
                        fail;
                    type_env.insert(v1.toString(),a);
                    type_env.insert(v2.toString(),b);
                    T = transient_collection(T);
                    S = transient_collection(S);
                    type_env.insert(vr.toString(),#<tuple(`T(`a),`S(`b))>);
                    if (unify(type_inference2(b1),type_inference2(b2)) == null)
                        type_error(e,"Incompatible keys in join ("+type_inference2(b1)
                                   +" and "+type_inference2(b2)+"): "+print_query(e));
                    match type_inference(br) {
                    case `S3(`ntp):
                        if (!is_collection(S3))
                            fail;
                        S3 = persistent_collection(S3);
                        return #<`S3(`ntp)>;
                    };
                    type_error(e,"The join reducer must return a collection: "+print_query(br));
                case _: type_error(e,"The right join input is not a collection: "+print_query(y));
                };
            case _: type_error(e,"The left join input is not a collection: "+print_query(x));
            };
        case crossProduct(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`vr,`br),`x,`y):
            match type_inference2(x) {
            case `T(`a):
                if (!is_collection(T))
                    fail;
                match type_inference2(y) {
                case `S(`b):
                    if (!is_collection(S))
                        fail;
                    type_env.insert(v1.toString(),a);
                    type_env.insert(v2.toString(),b);
                    match type_inference2(b1) {
                    case `S1(`w1):
                        if (!is_collection(S1))
                            fail;
                        match type_inference2(b2) {
                        case `S2(`w2):
                            if (!is_collection(S2))
                                fail;
                            type_env.insert(vr.toString(),#<tuple(`w1,`w2)>);
                            match type_inference(br) {
                            case `S3(`ntp):
                                if (!is_collection(S3))
                                    fail;
                                S3 = persistent_collection(S3);
                                return #<`S3(`ntp)>;
                            };
                            type_error(e,"The cross product reducer must return a collection: "+print_query(br));
                        case _: type_error(e,"Wrong right mapper in a cross product: "+print_query(b2));
                        };
                    case _: type_error(e,"Wrong left mapper in a cross product: "+print_query(b1));
                    };
                case _: type_error(e,"The right cross product input is not a collection: "+print_query(y));
                };
            case _: type_error(e,"The left cross product input is not a collection: "+print_query(x));
            };
        case mapReduce(lambda(`vm,`bm),lambda(`vr,`br),`X,_):
            match type_inference2(X) {
            case `T(`a):
                if (!is_collection(T))
                    fail;
                type_env.insert(vm.toString(),a);
                match type_inference2(bm) {
                case `S(tuple(`k,`b)):
                    if (!is_collection(S))
                        fail;
                    type_env.insert(vr.toString(),#<tuple(`k,list(`b))>);
                    match type_inference(br) {
                    case `S3(`ntp):
                        if (!is_collection(S3))
                            fail;
                        if (is_persistent_collection(T))
                            S3 = persistent_collection(S3);
                        return #<`S3(`ntp)>;
                    };
                    type_error(e,"The MapReduce reducer must return a collection: "+print_query(br));
                case _: type_error(e,"Wrong mapper in mapReduce: "+print_query(bm));
                }
            };
            type_error(e,"The MapReduce input is not a collection: "+print_query(X));
        case mapReduce2(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`vr,`br),`X,`Y,_):
            match type_inference2(X) {
            case `T(`a):
                if (!is_collection(T))
                    fail;
                match type_inference2(Y) {
                case `S(`b):
                    if (!is_collection(S))
                        fail;
                    type_env.insert(v1.toString(),a);
                    type_env.insert(v2.toString(),b);
                    match type_inference2(b1) {
                    case `S1(tuple(`k1,`w1)):
                        if (!is_collection(S1))
                            fail;
                        match type_inference2(b2) {
                        case `S2(tuple(`k2,`w2)):
                            if (!is_collection(S2))
                                fail;
                            if (unify(k1,k2) == null)
                                type_error(e,"incompatible keys in mapReduce2: "+print_query(e));
                            S1 = transient_collection(S1);
                            S2 = transient_collection(S2);
                            type_env.insert(vr.toString(),#<tuple(`S1(`w1),`S2(`w2))>);
                            match type_inference(br) {
                            case `S3(`ntp):
                                if (!is_collection(S3))
                                    fail;
                                if (is_persistent_collection(T) && is_persistent_collection(S))
                                    S3 = persistent_collection(S3);
                                return #<`S3(`ntp)>;
                            };
                            type_error(e,"The MapReduce2 reducer must return a collection: "+print_query(br));
                        case _: type_error(e,"Wrong right mapper in mapReduce2: "+print_query(b2));
                        };
                    case _: type_error(e,"Wrong left mapper in mapReduce2: "+print_query(b1));
                    };
                case _: type_error(e,"The right MapReduce2 input is not a collection: "+print_query(Y));
                };
            case _: type_error(e,"The left MapReduce2 input is not a collection: "+print_query(X));
            };
        case Collect(`x):
            match type_inference2(x) {
            case Bag(`tp): return #<bag(`tp)>;
            case List(`tp): return #<list(`tp)>;
            };
            type_error(e,"You can only cache persistent collections: "+e);
        case aggregate(lambda(`v,`b),`z,`X):
            match type_inference2(X) {
            case `T(`a):
                if (!is_collection(T))
                    fail;
                Tree ztp = type_inference2(z);
                type_env.insert(v.toString(),#<tuple(`ztp,`a)>);
                if (!subtype(ztp,type_inference2(b)))
                    type_error(e,"Wrong accumulator: "+print_query(e));
                return ztp;
            };
            type_error(e,"Aggregation input is not a collection: "+print_query(e));
        case repeat(lambda(`p,`b),`s,...ns):
            if (!ns.is_empty() && unify(type_inference2(ns.head()),#<int>) == null)
                type_error(e,"The maximum number of steps in repeat must be an integer: "
                           +print_query(ns.head()));
            Tree tp = type_inference2(s);
            bind_pattern_type(p,tp);
            match tp {
            case `T(`a):
                if (!is_collection(T))
                    fail;
                match type_inference2(b) {
                case `S(`c):                         // transitive closure
                    if (!is_collection(S))
                        fail;
                    if (unify(a,c) == null)
                        fail;
                    ((Node)e).name = #<closure>.toString();       // destructive
                    return #<`T(`a)>;
                case `S(tuple(`w,`c)):
                    if (!is_collection(S))
                        fail;
                    if (unify(c,#<bool>) == null)
                        fail;
                    if (unify(a,w) == null)
                        fail;
                    return #<`T(`a)>;
                case `t: type_error(e,"The repeat body must return a collection of type "
                                    +print_type(tp)+" or "+print_type(#<`T(tuple(`a,bool))>)
                                    +" (Found type: "+print_type(t)+")");
                }
            };
            type_error(e,"The repeat source must return a bag: "+print_query(e));
        case closure(lambda(`v,`b),`s,...ns):
            if (!ns.is_empty() && unify(type_inference2(ns.head()),#<int>) == null)
                type_error(e,"The maximum number of steps in closure must be an integer: "
                           +print_query(ns.head()));
            match type_inference2(s) {
            case `T(`a):
                if (!is_collection(T))
                    fail;
                type_env.insert(v.toString(),#<`T(`a)>);
                match type_inference2(b) {
                case `S(`tp):
                    if (!is_collection(S))
                        fail;
                    if (unify(a,tp) == null)
                        fail;
                    return #<`T(`a)>;
                case `tp: type_error(e,"The closure body must return a collection of type "
                                     +print_type(#<`T(`a)>)+" or "+print_type(#<`T(tuple(`a,bool))>)
                                     +" (Found type: "+print_type(tp)+")");
                }
            };
            type_error(e,"The closure source must return a bag: "+print_query(e));
        case loop(lambda(`p,`b),`s,`n):
            if (unify(type_inference2(n),#<int>) == null)
                type_error(e,"The number of steps in loop must be an integer: "+print_query(n));
            Tree tp = type_inference2(s);
            bind_pattern_type(p,tp);
            Tree btp = type_inference2(b);
            if (unify(tp,btp) == null)
                type_error(e,"The type of the repeat body ("+print_type(btp)
                           + ") and the type of the initial value ("+print_type(tp)+") do not match");
            return tp;
        case step(`x):  // used in QueryPlan for repeat
            match type_inference(x) {
            case `T(`tp): return #<`T(tuple(`tp,bool))>;
            };
        case cstep(`x):  // used in QueryPlan for closure
            return type_inference(x);
        case outerMerge(lambda(`v,`b),`s,`d):
            Tree ts = type_inference(s);
            if (unify(type_inference(d),ts) == null)
                type_error(e,"Wrong outerMerge");
            match ts {
            case `S(tuple(`k,`tp)):
                if (!is_collection(S))
                    fail;
                type_env.insert(v.toString(),#<tuple(`tp,`tp)>);
                type_inference(b);
                return ts;
            };
            type_error(e,"Wrong OuterMerge");
        case rightOuterMerge(lambda(`v,`b),`s,`d):
            Tree ts = type_inference(s);
            if (unify(type_inference(d),ts) == null)
                type_error(e,"Wrong RightOuterMerge");
            match ts {
            case `S(tuple(`k,`tp)):
                if (!is_collection(S))
                    fail;
                type_env.insert(v.toString(),#<tuple(`tp,`tp)>);
                type_inference(b);
                return ts;
            };
            type_error(e,"Wrong rightOuterMerge");
        case if(`p,`x,`y):
            if (unify(type_inference2(p),#<bool>) == null)
                type_error(e,"Expected a boolean predicate in if-then-else: "+print_query(p));
            Tree tx = type_inference2(x);
            Tree ty = type_inference2(y);
            Tree rt = unify(tx,ty,((Node)e).children.tail,((Node)e).children.tail.tail);
            if (rt == null)
                type_error(e,"Incompatible types in if-then-else: "+print_query(e));
            return rt;
        case call(inv,`x):
            return type_inference(x);
        case incr(...s):
            return type_inference(#<tuple(...s)>);
        case call(plus,`x,`y):
            Tree tx = type_inference2(x);
            Tree ty = type_inference2(y);
            match tx {
            case `T(`xt):
                if (!is_collection(T))
                    fail;
                match ty {
                case `S(`yt):
                    if (!is_collection(S))
                        fail;
                    Tree rt = unify(tx,ty,((Node)e).children.tail,((Node)e).children.tail.tail);
                    if (rt == null)
                        type_error(e,"Incompatible types in union/append: "+print_type(tx)+" and "+print_type(ty));
                    return rt;
                }
            };
            fail
        case `f(`x,`y):
            if (! #[union,intersect,except].member(#<`f>))
                fail;
            Tree tx = type_inference2(x);
            Tree ty = type_inference2(y);
            match tx {
            case `T(`t1):
                if (!is_collection(T))
                    fail;
                Tree t = unify(tx,ty,((Node)e).children,((Node)e).children.tail);
                if (t != null)
                    return t;
            };
            type_error(e,"Incompatible types in "+f+": "+print_type(tx)+" and "+print_type(ty));
        case call(member,`x,`y):
            Tree tx = type_inference2(x);
            Tree ty = type_inference2(y);
            match ty {
            case `T(`t1):
                if (!is_collection(T))
                    fail;
                if (!subtype(tx,t1))
                    type_error(e,"Incompatible types in member: "+print_type(tx)+" and "+print_type(ty));
                coerce(tx,t1,((Node)e).children.tail);
                return #<bool>;
            };
        case call(elem,`x):
            Tree tp = type_inference2(x);
            match tp {
            case `T(`t):
                if (is_collection(T))
                    return t;
            };
            type_error(e,"method elem must be applied to a collection: "+tp);
        case call(`f,`x,`y):
            if (! #[eq,neq,lt,leq,gt,geq].member(f))
                fail;
            Tree tx = type_inference2(x);
            Tree ty = type_inference2(y);
            if (!subtype(tx,ty) && !subtype(ty,tx))
                type_error(e,"Incompatible types in comparison "+f+": "
                           +print_type(tx)+" and "+print_type(ty));
            if (unify(tx,ty) != null)
                return #<bool>;
            if (subtype(tx,ty))
                coerce(tx,ty,((Node)e).children.tail);
            else if (subtype(ty,tx))
                coerce(ty,tx,((Node)e).children.tail.tail);
            return #<bool>;
        case call(`f,`s):
            for ( Tree monoid: monoids )
                match monoid {
                case `aggr(`mtp,`plus,`zero,`unit):
                    if (!aggr.equals(f.toString()))
                        continue;
                    match type_inference2(s) {
                    case `T(`tp):
                        if (!is_collection(T))
                            type_error(e,"Aggregation must be over collections: "+s);
                        if (unify(mtp,tp) == null)
                            continue;
                        Tree nv = new_var();
                        type_env.begin_scope();
                        type_env.insert(nv.toString(),tp);
                        Tree t = type_inference2(#<apply(`unit,`nv)>);
                        Tree tz = type_inference2(zero);
                        type_env.end_scope();
                        if (unify(t,tz) != null)
                            return t;
                    }
                };
            fail
        case call(avg,`s):
            return type_inference(#<call(div,typed(call(sum,`s),double),call(count,`s))>);
        case call(join_key,`s1,`s2):
            match type_inference2(s1) {
            case `T1(tuple(`tp1,_)):
                match type_inference2(s2) {
                case `T2(tuple(`tp2,_)):
                    if (is_collection(T1) && is_collection(T2) && unify(tp1,tp2) != null)
                        return tp1;
                }
            };
            type_error(e,"Expected two collections with the same key in join key: "+e);
        case reduce(`m,`s):
            return type_inference2(#<call(`m,`s)>);
        case trace(`msg,`t,`x):
            if (unify(type_inference2(msg),#<string>) == null)
                type_error(e,"Expected a string in the trace message");
            Tree tp = type_inference2(x);
            match tp {
            case `T(_):
                if (Config.bsp_mode && is_persistent_collection(T))
                    type_error(e,"Cannot trace expressions of persistent type in BSP mode: "+x);
            }; 
            if (t.equals(#<none>))
                ((Node)e).children.tail.head = tp;       // destructive
            return tp;
        case apply(lambda(`v,`b),`arg):
            type_env.begin_scope();
            type_env.insert(v.toString(),type_inference(arg));
            Tree tp = type_inference(b);
            type_env.end_scope();
            return tp;
        case call(`f,...al):
            Tree macro = global_macros.lookup(f.toString());
            if (macro == null)
                fail;
            match macro {
            case macro(params(...pl),`body):
                Tree b = body;
                if (pl.length() != al.length())
                    fail;
                for ( ; !pl.is_empty(); pl = pl.tail(), al = al.tail() )
                    b = subst(pl.head(),al.head(),b);
                return type_inference2(b);
            };
            fail
        case call(`f,...el):
            if (!f.is_variable() || global_vars.lookup(f.toString()) != null)
                fail;
            Tree ret = data_constructors.lookup(f.toString());
            if (ret != null)
                match ret {
                case `v(`n,`tp):
                    if (unify(type_inference(make_tuple(el)),tp) != null)
                        return #<`v>;
                    else type_error(e,"Wrong data construction: "+print_query(e));
                };
            ret = type_env.lookup(f.toString());
            if (ret == null)
                ret = global_type_env.lookup(f.toString());
            if (ret != null)
                match ret {
                case arrow(`s,`d):
                    Tree tp = type_inference(#<tuple(...el)>);
                    if (!subtype(tp,s))
                        type_error(e,"The domain of the anonymous function "+print_type(s)+" in "+print_query(e)
                                   +" does not match the argument types "+print_type(tp));
                    if (unify(tp,s) == null)  // coerce args
                        match #<tuple(`tp,`s)> {
                        case tuple(tuple(...txs),tuple(...tys)):
                            for ( ; txs != #[]; txs = txs.tail, tys = tys.tail, el = el.tail )
                                coerce(txs.head,tys.head,el);
                        }
                    return d;
                case _: type_error(e,"Expected a functional type for "+f+" (found "+print_type(ret)+")");
                };
            Trees tps = #[];
            for ( Tree a: el )
                tps = tps.append(type_inference(a));
            for ( Tree fnc: functions )
                match fnc {
                case `fn(`otp,...tl):
                    if (!fn.equals(f.toString()) || !subtype(tps,tl))
                        fail;
                    if (f.equals(#<XMLchildren>))
                        return #<list(XML)>;
                    else if (f.equals(#<XMLattributes>))
                        return #<list(string)>;
                    else if (f.equals(#<XMLattribute>) && collection_type(otp))
                        return #<list(string)>;
                    else return otp;
                };
            type_error(e,"Undefined function "+f+" over arguments of type "+print_type(#<tuple(...tps)>));
        case apply(`f,`u):
            match type_inference2(f) {
            case arrow(`s,`d):
                Tree tp = type_inference(u);
                if (!subtype(tp,s))
                    type_error(e,"The domain of the anonymous function "+print_type(s)+" in "+print_query(e)
                               +" does not match the argument types "+print_type(tp));
                if (unify(tp,s) == null)  // coerce args
                    coerce(tp,s,((Node)e).children.tail);
                return d;
            case `tp: type_error(e,"Expected a functional type for "+f+" (found "+print_type(tp)+")");
            };
        case callM(`f,_,...el):
            return type_inference(#<call(`f,...el)>);
        case true: return #<bool>;
        case false: return #<bool>;
        case null: return #<any>;
        case `v:
            if (!v.is_variable())
                fail;
            Tree ret1 = type_env.lookup(v.toString());
            Tree ret2 = global_type_env.lookup(v.toString());
            if (ret1 == null)
                if (ret2 == null) {
                    Tree ret = global_vars.lookup(v.toString());
                    if (ret == null) {
                        String msg = "";
                        if (!Config.trace && type_env.iterator().hasNext()) {
                            msg += "\nVariable Types:";
                            for ( String var: type_env )
                                msg += "\n "+var + ": " + print_type(type_env.lookup(var));
                        };
                        type_error(e,"Undefined variable: "+v+msg);
                    } else if (!v.equals(ret))
                        return type_inference(ret);
                } else return ret2;
            else return ret1;
        case `n:
            if (n.is_long())
              return #<int>;
            else if (n.is_double())
                return #<double>;
            else if (n.is_string())
                return #<string>;
        };
        type_error(e,"Wrong expression: "+print_query(e));
        throw new Error();
    }

    /** check the type for inconsistencies and fix the transient/persistent components */
    public static Tree normalize_type ( Tree type ) {
        match type {
        case record(...bs):
            Trees as = #[];
            Trees vs = #[];
            for ( Tree b: bs )
                match b {
                case bind(`v,`tp):
                    if (v.is_variable())
                        if (vs.member(v))
                            type_error(type,"Duplicate record attributes: "+print_type(type));
                        else {
                            vs = vs.append(v);
                            as = as.append(#<bind(`v,`(normalize_type(tp)))>);
                        }
                    else type_error(type,"Expected an attribute name: "+v);
                case _: type_error(type,"Ill-formed record type: "+print_type(type));
                };
            return #<record(...as)>;
        case tuple(...ts):
            Trees as = #[];
            for ( Tree t: ts )
                as = as.append(normalize_type(t));
            return #<tuple(...as)>;
        case union(...bs):
            Trees as = #[];
            for ( Tree b: bs )
                match b {
                case `c(`tp):
                    as = as.append(#<`c(`(normalize_type(tp)))>);
                case _: type_error(type,"Ill-formed union type: "+print_type(type));
                };
            return #<union(...as)>;
        case persistent(bag(`tp)):
            Tree ntp = normalize_type(tp);
            return #<Bag(`ntp)>;
        case persistent(list(`tp)):
            Tree ntp = normalize_type(tp);
            return #<List(`ntp)>;
        case `T(`tp):
            if (is_collection(T))
                return #<`T(`(normalize_type(tp)))>;
            else fail
        case `tp:
            if (!tp.is_variable())
                fail;
            if (#[bool,byte,short,int,long,float,double,string].member(tp))
                return tp;
            Tree rt = global_datatype_env.lookup(tp.toString());
            if (rt != null)
                return tp;
            rt = type_names.lookup(tp.toString());
            if (rt != null)
                return tp;
        };
        type_error(type,"Unrecognized type: "+print_type(type));
        return type;
    }
}
