/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.mrql;

import org.apache.mrql.gen.*;
import java.util.Iterator;
import java.util.List;


/** printers for types, expressions, plans, etc */
public class Printer {

    public static String print_type ( Tree tp ) {
        match tp {
        case tuple(...tl):
            if (tl.is_empty())
                return "()";
            String s = "( "+print_type(tl.head());
            for ( Tree t: tl.tail() )
                s += ", "+print_type(t);
            return s+" )";
        case record(...tl):
            if (tl.is_empty())
                return "< >";
            String s = "< ";
            match tl.head() {
            case bind(`a,`t):
                s += a+": "+print_type(t);
            };
            for ( Tree t: tl.tail() )
                match t {
                case bind(`a,`at):
                    s += ", "+a+": "+print_type(at);
                };
            return s+" >";
        case arrow(`itp,`otp):
            return print_type(itp)+" -> "+print_type(otp);
        case persistent(`t):
            return "!"+print_type(t);
        case `f():
            return f+"()";
        case persistent(`t):
            return "!"+print_type(t);
        case Bag(`etp):
            return "!bag("+print_type(etp)+")";
        case List(`etp):
            return "!list("+print_type(etp)+")";
        case `f(...tl):
            String s = f+"( "+print_type(tl.head());
            for ( Tree t: tl.tail() )
                s += ", "+print_type(t);
            return s+" )";
        };
        return tp.toString();
    }

    private static String print_query_list ( Trees el ) {
        if (el.length() == 0)
            return "";
        String s = " ";
        s += print_query(el.head());
        for ( Tree a: el.tail() )
            s += ", "+print_query(a);
        return s+" ";
    }

    public static String print_query ( Tree e ) {
        match e {
        case select(`opt_dist,`u,from(...bl),where(`c),groupby(...gs),orderby(...os)):
            String s = "select "+(opt_dist.equals(#<none>) ? "" : "distinct ");
            s += print_query(u)+" from ";
            match bl.head() {
            case bind(`p,`d):
                s += print_query(p)+" in "+print_query(d);
            };
            for ( Tree b: bl.tail() )
                match b {
                case bind(`p,`d):
                    s += ", "+print_query(p)+" in "+print_query(d);
                };
            if (!c.equals(#<true>))
                s += " where "+print_query(c);
            match #<groupby(...gs)> {
            case groupby(`h,...gl):
                s += " group by ";
                match gl.head() {
                case bind(`gp,`gd):
                    s += print_query(gp)+": "+print_query(gd);
                };
                for ( Tree g: gl.tail() )
                    match g {
                    case bind(`gp,`gd):
                        s += ", "+print_query(gp)+": "+print_query(gd);
                    };
                if (!h.equals(#<true>))
                    s += " having "+print_query(h);
            };
            match #<orderby(...os)> {
            case orderby(`l,...ol):
                s += " order by "+print_query(ol.length() == 1 ? ol.head() : #<tuple(...ol)>);
                if (!l.equals(#<none>))
                    s += " limit "+print_query(l);
            };
            return s;
        case tuple(...el):
            return "("+print_query_list(el)+")";
        case record(...el):
            String s = "< ";
            match el.head() {
            case bind(`v,`b):
                s += v+": "+print_query(b);
            };
            for ( Tree a: el.tail() )
                match a {
                case bind(`v,`b):
                    s += ", "+v+": "+print_query(b);
                };
            return s+" >";
        case project(`a,`v):
            return (a.is_variable()) ? print_query(a)+"."+v : "("+print_query(a)+")."+v;
        case index(`a,`i):
            return (a.is_variable()) ? print_query(a)+"["+print_query(i)+"]"
                                     : "("+print_query(a)+")["+print_query(i)+"]";
        case nth(`x,`n):
            return (x.is_variable()) ? print_query(x)+"#"+print_query(n)
                                     : "("+print_query(x)+")#"+print_query(n);
        case call(`f,...el):
            return f+"("+print_query_list(el)+")";
        case list(...el):
            return "["+print_query_list(el)+"]";
        case bag(...el):
            return "{"+print_query_list(el)+"}";
        case `f(...el):
            return f+"("+print_query_list(el)+")";
        };
        return e.toString();
    }

    private static String print_XML ( final Union x ) {
        if (x.tag() == 1)
            return ((MR_string)x.value()).get();
        Tuple t = (Tuple)x.value();
        String s = "<"+((MR_string)t.get(0)).get();
        for ( MRData a: (Bag)t.get(1) ) {
            Tuple attr = (Tuple)a;
            s += " "+((MR_string)attr.first()).get()+"=\""
                 +((MR_string)attr.second()).get()+"\"";
        };
        Bag c = (Bag)t.get(2);
        if (c.size() == 0)
            return s+"/>";
        s += ">";
        for ( MRData e: c )
            s += print_XML((Union)e);
        return s+"</"+((MR_string)t.get(0)).get()+">";
    }

    private static String print_JSON ( final Union x ) {
        switch (x.tag()) {
        case 0:
            String s = "{ ";
            for ( MRData e: (Bag)x.value() ) {
                Tuple t = (Tuple)e;
                s += t.get(0)+": "+print_JSON((Union)t.get(1))+", ";
            };
            return s.substring(0,s.length()-2)+" }";
        case 1:
            String q = "[ ";
            for ( MRData e: (Bag)x.value() )
                q += print_JSON((Union)e)+", ";
            return q.substring(0,q.length()-2)+" ]";
        };
        return ""+x.value();
    }

    /** An MRData printer based on type information */
    final static String print ( final MRData x, final Tree type ) {
        try {
            if (x instanceof Inv)
                return print(((Inv)x).value(),type);
            if (!Config.testing && type.equals(#<XML>))
                return print_XML((Union)x);
            if (!Config.testing && type.equals(#<JSON>))
                return print_JSON((Union)x);
            match TypeInference.expand(type) {
            case persistent(`tp):
                return print(x,tp);
            case Bag(`tp):
                if (x instanceof MR_dataset) {
                    DataSet ds = ((MR_dataset)x).dataset();
                    List<MRData> vals = ds.take(Config.max_bag_size_print);
                    if (vals.size() == 0)
                        return "{}";
                    String s = "{ "+print(vals.get(0),tp);
                    for ( int i = 1; i < vals.size() && (Config.max_bag_size_print < 0
                                                         || i < Config.max_bag_size_print); i++ )
                        s += ", "+print(vals.get(i),tp);
                    if (Config.max_bag_size_print > 0 && vals.size() >= Config.max_bag_size_print)
                        return s+", ... }";
                    else return s+" }";
                } else return print(Evaluator.evaluator.toBag(x),#<bag(`tp)>);
            case List(`tp):
                if (x instanceof MR_dataset) {
                    DataSet ds = ((MR_dataset)x).dataset();
                    List<MRData> vals = ds.take(Config.max_bag_size_print);
                    if (vals.size() == 0)
                        return "[]";
                    String s = "[ "+print(vals.get(0),tp);
                    for ( int i = 1; i < vals.size() && (Config.max_bag_size_print < 0
                                                         || i < Config.max_bag_size_print); i++ )
                        s += ", "+print(vals.get(i),tp);
                    if (Config.max_bag_size_print > 0 && vals.size() >= Config.max_bag_size_print)
                        return s+", ... ]";
                    else return s+" ]";
                } else return print(Evaluator.evaluator.toBag(x),#<list(`tp)>);
            case bag(`tp):
                Bag b = (Bag)x;
                Iterator<MRData> bi = b.iterator();
                if (!bi.hasNext())
                    return "{}";
                String s = "{ "+print(bi.next(),tp);
                for ( long i = 1; bi.hasNext() && (Config.max_bag_size_print < 0
                                                   || i < Config.max_bag_size_print); i++ )
                    s += ", "+print(bi.next(),tp);
                if (bi.hasNext())
                    return s+", ... }";
                else return s+" }";
            case list(`tp):
                Bag b = (Bag)x;
                Iterator<MRData> bi = b.iterator();
                if (!bi.hasNext())
                    return "[]";
                String s = "[ "+print(bi.next(),tp);
                for ( long i = 1; bi.hasNext() && (Config.max_bag_size_print < 0
                                                   || i < Config.max_bag_size_print); i++ )
                    s += ", "+print(bi.next(),tp);
                if (bi.hasNext())
                    return s+", ... ]";
                else return s+" ]";
            case tuple(...el):
                Tuple t = (Tuple)x;
                if (t.size() == 0)
                    return "()";
                String s = "("+print(t.get((short)0),el.nth(0));
                for ( short i = 1; i < t.size(); i++ )
                    s += ","+print(t.get(i),el.nth(i));
                return s+")";
            case record(...el):
                Tuple t = (Tuple)x;
                if (t.size() == 0)
                    return "<>";
                String s = "< ";
                match el.nth(0) {
                case bind(`a,`tp):
                    s += a+": "+print(t.get((short)0),tp);
                };
                for ( short i = 1; i < t.size(); i++ )
                    match el.nth(i) {
                    case bind(`a,`tp):
                        s += ", "+a+": "+print(t.get(i),tp);
                    };
                return s+" >";
            case union(...el):
                Union u = (Union)x;
                match el.nth(u.tag()) {
                case `c(tuple(...ts)):
                    return c+print(u.value(),#<tuple(...ts)>);
                case `c(`tp):
                    return c+"("+print(u.value(),tp)+")";
                }
            };
            return x.toString();
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

    private final static String tab ( int n ) {
        String s = "";
        for ( int i = 0; i < n; i++ )
            s += " ";
        return s;
    }

    /** print a physical plan
     * @param e the plan
     * @param n tab (# of spaces to put in the beginning of the line)
     * @param pv is this a variable bound to physical plan?
     * @return the printout
     */
    public final static String print_plan ( Tree e, int n, boolean pv ) {
        match e {
        case cMap(`f,`s):
            return "cMap:\n"+tab(n+3)+"input: "+print_plan(s,n+10,true);
        case AggregateMap(`f,`a,`z,`s):
            return "AggregateMap:\n"+tab(n+3)+"input: "+print_plan(s,n+10,true);
        case MapReduce(`m,`r,`s,_):
            return "MapReduce:\n"+tab(n+3)+"input: "+print_plan(s,n+10,true);
        case MapCombineReduce(`m,`c,`r,`s,_):
            return "MapCombineReduce:\n"+tab(n+3)+"input: "+print_plan(s,n+10,true);
        case MapAggregateReduce(`m,`r,`a,`z,`s,_):
            return "MapAggregateReduce:\n"+tab(n+3)+"input: "+print_plan(s,n+10,true);
        case MapReduce2(`mx,`my,`r,`x,`y,_):
            return "MapReduce2:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_):
            return "MapCombineReduce2:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case MapAggregateReduce2(`mx,`my,`r,`a,null,`x,`y,...):
            return "MapReduce2:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case MapAggregateReduce2(`mx,`my,`r,`a,`z,`x,`y,...):
            return "MapAggregateReduce2:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case MapJoin(`kx,`ky,`r,`x,`y):
            return "MapJoin:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case MapAggregateJoin(`kx,`ky,`r,`a,null,`x,`y):
            return "MapJoin:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case MapAggregateJoin(`kx,`ky,`r,`a,`z,`x,`y):
            return "MapAggregateJoin:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,_):
            return "GroupByJoin:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case CrossProduct(`mx,`my,`r,`x,`y):
            return "CrossProduct:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case OuterMerge(`merge,`x,`y):
            return "OuterMerge:\n"+tab(n+3)+"state: "+print_plan(x,n+10,true)+"\n"
                   +tab(n+3)+"delta: "+print_plan(y,n+10,true);
        case RightOuterMerge(`merge,`x,`y):
            return "RightOuterMerge:\n"+tab(n+3)+"state: "+print_plan(x,n+10,true)+"\n"
                   +tab(n+3)+"delta: "+print_plan(y,n+10,true);
        case CrossAggregateProduct(`mx,`my,`r,`a,null,`x,`y):
            return "CrossProduct:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case CrossAggregateProduct(`mx,`my,`r,`a,`z,`x,`y):
            return "CrossAggregateProduct:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case Aggregate(`a,`z,`s):
            return "Aggregate:\n"+tab(n+3)+"input: "+print_plan(s,n+10,true);
        case BinarySource(`k,`file,_):
            return "Source (binary): "+file;
        case BinarySource(`file,_):
            return "Source (binary): "+file;
        case ParsedSource(`m,`parser,`file,...args):
            if (m instanceof LongLeaf)
                return "Source ("+parser+"): "+file;
            else fail
        case ParsedSource(`parser,`file,...args):
            return "Source ("+parser+"): "+file;
        case BinaryStream(`k,`file,_):
            return "Stream (binary): "+file;
        case BinaryStream(`file,_):
            return "Stream (binary): "+file;
        case ParsedStream(`m,`parser,`file,...args):
            if (m instanceof LongLeaf)
                return "Stream ("+parser+"): "+file;
            else fail
        case ParsedStream(`parser,`file,...args):
            return "Stream ("+parser+"): "+file;
        case Generator(...):
            return "Generator";
        case Merge(`x,`y):
            return "Merge:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                   +tab(n+3)+"right: "+print_plan(y,n+10,true);
        case Provenance(`x,...):
            return print_plan(x,n,pv);
        case BSP(_,_,_,_,...ds):
            String ret = "BSP:\n";
            for ( Tree d: ds )
                ret += tab(n+3)+"input: "+print_plan(d,n+10,true);
            return ret;
        case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),...):
            String ret = "Loop ("+vs+"):\n"+tab(n+3)+"init: "
                +vs.head()+" = "+print_plan(ss.head(),n+vs.head().toString().length()+12,true)+"\n";
            ss = ss.tail();
            for ( Trees s = vs.tail(); !s.is_empty() && !ss.is_empty(); s = s.tail(), ss = ss.tail() )
                ret += tab(n+9)+s.head()+" = "+print_plan(ss.head(),n+s.head().toString().length()+12,true)+"\n";
            ret += tab(n+3)+"step: "
                +vs.head()+" = "+print_plan(bs.head(),n+vs.head().toString().length()+12,true)+"\n";
            bs = bs.tail();
            for ( Trees s = vs.tail(); !s.is_empty() && !bs.is_empty(); s = s.tail(), bs = bs.tail() )
                ret += tab(n+9)+s.head()+" = "+print_plan(bs.head(),n+s.head().toString().length()+12,true)+"\n";
            return ret;
        case `f(lambda(`v,`b),`s,...):
            if (! #[Repeat,repeat,Closure,closure].member(#<`f>))
                fail;
            return f+" ("+v+"):\n"+tab(n+3)+"init: "+print_plan(s,n+9,true)+"\n"
                   +tab(n+3)+"step: "+print_plan(b,n+9,true);
        case Let(`v,`u,`body):
            return "Let "+v+" = "+print_plan(u,n+10+v.toString().length(),pv)+"\n"
                   +tab(n)+print_plan(body,n,pv);
        case let(`v,`u,`body):
            if (print_plan(u,0,pv).length() > 0)
                return "let "+v+" = "+print_plan(u,n+10+v.toString().length(),pv)+"\n"
                    +tab(n)+print_plan(body,n,pv);
            else return print_plan(body,n,pv);
        case lambda(tuple(...vs),`b):
            return "lambda "+vs+":\n"+tab(n+3)+print_plan(b,n+9,true);
        case lambda(`v,`b):
            return "lambda ("+v+"):\n"+tab(n+3)+print_plan(b,n+9,true);
        case If(_,`x1,If(_,`x2,If(_,`x3,`x4))):
            return "Choice 1: "+print_plan(x1,n+10,pv)+"\n"
                   +tab(n)+"Choice 2: "+print_plan(x2,n+10,pv)+"\n"
                   +tab(n)+"Choice 3: "+print_plan(x3,n+10,pv)+"\n"
                   +tab(n)+"Choice 4: "+print_plan(x4,n+10,pv);
        case If(_,`x1,If(_,`x2,`x3)):
            return "Choice 1: "+print_plan(x1,n+10,pv)+"\n"
                   +tab(n)+"Choice 2: "+print_plan(x2,n+10,pv)+"\n"
                   +tab(n)+"Choice 3: "+print_plan(x3,n+10,pv);
        case If(`c,`x,`y):
            return "Choice 1: "+print_plan(x,n+10,pv)+"\n"
                   +tab(n)+"Choice 2: "+print_plan(y,n+10,pv);
        case tuple():
            return "( )";
        case tuple(...ts):
            String s = "( ";
            if (print_plan(ts.head(),0,pv).length() > 0)
                s += print_plan(ts.head(),n+2,pv)+"\n";
            for ( Tree t: ts.tail() )
                if (print_plan(t,0,pv).length() > 0)
                    s += tab(n+2)+print_plan(t,n+2,pv)+"\n";
            if (s.equals("( "))
                return "";
            else return s+tab(n)+")";
        case incr(`z,lambda(tuple(...s),`h),lambda(tuple(...t),`a)):
            return "Incremental:\n"+tab(n+3)+"State Transformer "+s+": "+print_plan(h,n+21+s.toString().length(),true)+"\n"
                +tab(n+3)+"Answer "+t+": "+print_plan(a,n+10+t.toString().length(),true);
        case incr(`z,lambda(`s,`h),lambda(`t,`a)):
            return "Incremental:\n"+tab(n+3)+"Zero: "+print_plan(z,n+9,false)+"\n"
                +tab(n+3)+"State Transformer ("+s+"): "+print_plan(h,n+23+s.toString().length(),true)+"\n"
                +tab(n+3)+"Answer ("+t+"): "+print_plan(a,n+12+t.toString().length(),true);
        case `f(...as):
            String s = "";
            for (Tree a: as) {
                String ps = print_plan(a,n,pv);
                if (!ps.equals("") && !a.is_variable())
                    s += ps+(ps.endsWith("\n")?"":"\n");
            };
            return s;
        };
        if (pv && e.is_variable())
            return e.toString();
        return "";
    }

    /** given an MRData value, construct an expression that builds this data
     * @param x the MRData
     * @param type the type of x
     * @return an expression that constructs x
     */
    public final static Tree reify ( final MRData x, Tree type ) {
        if (x instanceof MR_variable)
            return new VariableLeaf("t_"+((MR_variable)x).var_num);
        type = TypeInference.expand(type);
        match type {
        case `T(`tp):
            if (!Translator.is_collection(T))
                fail;
            Bag b = (Bag)x;
            Trees as = #[];
            for ( MRData e: b)
                as = as.append(reify(e,tp));
            return #<`T(...as)>;
        case tuple(...el):
            Tuple t = (Tuple)x;
            Trees as = #[];
            for ( short i = 0; i < t.size(); i++ )
                as = as.append(reify(t.get(i),el.nth(i)));
            return #<tuple(...as)>;
        case record(...el):
            Tuple t = (Tuple)x;
            Trees as = #[];
            for ( short i = 0; i < t.size(); i++ )
                match el.nth(i) {
                case bind(`a,`tp):
                    as = as.append(#<bind(`a,`(reify(t.get(i),tp)))>);
                };
            return #<record(...as)>;
        case union(...el):
            Union u = (Union)x;
            match el.nth(u.tag()) {
            case `c(tuple(...ts)):
                return #<call(`c,`(reify(u.value(),#<tuple(...ts)>)))>;
            case `c(`tp):
                return #<call(`c,`(reify(u.value(),tp)))>;
            };
        case string:
            String[] s = ((MR_string)x).get().split("\\x7B\\x7B");
            if (s.length == 1)
                return new StringLeaf(s[0]);
            Trees as = s[0].length() == 0 ? #[] : #[].append(new StringLeaf(s[0]));
            for ( int i = 1; i < s.length; i++ ) {
                String[] d = s[i].split("\\x7D\\x7D",2);
                if (d.length < 2)
                    throw new Error("");
                as = as.append(new VariableLeaf("t_"+Integer.parseInt(d[0])));
                if (d[1].length() > 0)
                    as = as.append(new StringLeaf(d[1]));
            };
            Tree res = as.reverse().head();
            for ( Tree a: as.reverse().tail() )
                res = #<call(plus,`a,`res)>;
            return res;
        case short: return #<typed(`(((MR_short)x).get()),`type)>;
        case int: return #<typed(`(((MR_int)x).get()),`type)>;
        case long: return #<typed(`((int)((MR_long)x).get()),`type)>;
        case float: return #<typed(`(((MR_float)x).get()),`type)>;
        case double: return #<typed(`(((MR_double)x).get()),`type)>;
        };
        throw new Error("wrong type: "+type);
    }
}
