/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.mrql;

import org.apache.mrql.gen.*;
import java.util.List;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.HamaConfiguration;


/** Evaluate a physical plan in BSP mode using Apache Hama */
final public class BSPEvaluator extends Evaluator {

    /** initialize the BSP evaluator */
    final public void init ( Configuration conf ) {
        Config.bsp_mode = true;
        if (Config.hadoop_mode)
            if (Config.local_mode) {
                conf.set("bsp.master.address","local");
                conf.set("hama.zookeeper.quorum","localhost");
                conf.setInt("bsp.local.tasks.maximum",Config.nodes);
                conf.set("fs.defaultFS","file:///");
            } else {
                conf.set("bsp.master.address",System.getenv("BSP_MASTER_ADDRESS"));
                conf.set("hama.zookeeper.quorum",System.getenv("HAMA_ZOOKEEPER_QUORUM"));
                conf.setInt("bsp.local.tasks.maximum",Config.nodes);
                conf.set("fs.defaultFS",System.getenv("FS_DEFAULT_NAME"));
            }
    }

    final public void initialize_query () {
    }

    /** shutdown the BSP evaluator */
    final public void shutdown ( Configuration conf ) {
    }

    final public Configuration new_configuration () {
        return new HamaConfiguration();
    }

    final public MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
        if (Config.hadoop_mode)
            return BSPPlan.BSPop.synchronize(peerName,mr_exit);
        else return mr_exit;
    }

    final public Bag distribute ( MR_string peerName, Bag s ) {
        if (Config.hadoop_mode)
            return BSPPlan.BSPop.distribute(peerName,s);
        else return s;
    }

    /** Evaluate MRQL physical operators using BSP
     * @param plan the physical plan
     * @param env contains bindings fro variables to values (MRData)
     * @return a DataSet (stored in HDFS)
     */
    final public MRData bsp ( Tree plan, Environment env ) throws Exception {
        match plan {
        case BSP(tuple(...ns),`superstep,`init_state,`o,...S):
            int[] os = new int[ns.length()];
            for ( int i = 0; i < os.length; i++ )
                os[i] = (int)((LongLeaf)ns.nth(i)).value();
            DataSet ds = eval(S.head(),env);
            for ( Tree s: S.tail() )
                ds.merge(eval(s,env));
            return BSPPlan.BSP(os,
                               closure(superstep,env),
                               init_state,
                               o.equals(#<true>),
                               ds);
        case BSP(`n,`superstep,`init_state,`o,...S):
            DataSet ds = eval(S.head(),env);
            for ( Tree s: S.tail() )
                ds.merge(eval(s,env));
            return BSPPlan.BSP(new int[]{(int)((LongLeaf)n).value()},
                               closure(superstep,env),
                               init_state,
                               o.equals(#<true>),
                               ds);
        }
        throw new Error("Cannot perform the BSP plan: "+plan);
    }

    /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
    final public Class<? extends MRQLFileInputFormat> parsedInputFormat () {
        return BSPParsedInputFormat.class;
    }

    /** return the FileInputFormat for binary files */
    final public Class<? extends MRQLFileInputFormat> binaryInputFormat () {
        return BSPBinaryInputFormat.class;
    }

    /** return the FileInputFormat for data generator files */
    final public Class<? extends MRQLFileInputFormat> generatorInputFormat () {
        return BSPGeneratorInputFormat.class;
    }

    /** Coerce a persistent collection to a Bag */
    public Bag toBag ( MRData data ) {
        try {
            if (data instanceof MR_dataset)
                return Plan.collect(((MR_dataset)data).dataset());
        } catch (Exception ex) {
            throw new Error("Cannot coerce "+data+" to a Bag");
        };
        return (Bag)data;
    }

    /** The Aggregate physical operator
     * @param acc_fnc  the accumulator function from (T,T) to T
     * @param zero  the zero element of type T
     * @param plan the plan that constructs the dataset that contains the bag of values {T}
     * @param env contains bindings fro variables to values (MRData)
     * @return the aggregation result of type T
     */
    final public MRData aggregate ( Tree acc_fnc,
                                    Tree zero,
                                    Tree plan,
                                    Environment env ) throws Exception {
        match plan {
        case BSP(`n,`superstep,`init_state,`o,...S):
            DataSet ds = eval(S.head(),env,"-");
            for ( Tree s: S.tail() )
                ds.merge(eval(s,env,"-"));
            return BSPPlan.BSPaggregate((int)((LongLeaf)n).value(),
                                        closure(superstep,env),
                                        init_state,
                                        closure(acc_fnc,env),
                                        zero,
                                        ds);
        }
        throw new Error("Cannot perform the aggregation: "+plan);
    }

    /** Evaluate a loop a fixed # of times */
    final public Tuple loop ( Tree e, Environment env ) throws Exception {
        match e {
        case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
            int limit = ((MR_int)evalE(num,env)).get();
            MR_dataset[] s = new MR_dataset[vs.length()];
            for ( int i = 0; i < vs.length(); i++ )
                s[i] = new MR_dataset(eval(ss.nth(i),env,"-"));
            for ( int n = 0; n < limit; n++ ) {
                Environment nenv = env;
                for ( int i = 0; i < vs.length(); i ++ )
                    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
                for ( int i = 0; i < vs.length(); i ++ )
                    s[i] = new MR_dataset(eval(bs.nth(i),nenv,"-"));
            };
            return new Tuple(s);
        };
        throw new Error("Wrong Loop format");
    }

    final public DataSet eval ( final Tree e,
                                final Environment env,
                                final String counter ) {
        return eval(e,env);
    }

    /** Evaluate MRQL BSP physical operators using Hama (returns a DataSet)
     *   and print tracing info
     * @param e the physical plan
     * @param env contains bindings fro variables to values (MRData)
     * @return a DataSet (stored in HDFS)
     */
    final public DataSet eval ( final Tree e, final Environment env ) {
        if (Config.trace_execution) {
            tab_count += 3;
            System.out.println(tabs(tab_count)+print_query(e));
        };
        DataSet res = evalD(e,env);
        if (Config.trace_execution) 
            try {
                System.out.println(tabs(tab_count)
                                   +"-> "+res.take(Config.max_bag_size_print));
                tab_count -= 3;
            } catch (Exception ex) {
                throw new Error("Cannot collect the operator output: "+e);
            };
        return res;
    }

    /** Evaluate MRQL BSP physical operators using Hama (returns a DataSet)
     * @param e the physical plan
     * @param env contains bindings fro variables to values (MRData)
     * @return a DataSet (stored in HDFS)
     */
    final public DataSet evalD ( final Tree e, final Environment env ) {
        try {
            match e {
            case BSPSource(`n,BinarySource(`file,_)):
                return Plan.binarySource((int)((LongLeaf)n).value(),file.stringValue());
            case BSPSource(`n,ParsedSource(`parser,`file,...args)):
                Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
                if (p == null)
                    throw new Error("Unknown parser: "+parser);
                return Plan.parsedSource((int)((LongLeaf)n).value(),
                                         p,((MR_string)evalE(file,env)).get(),args);
            case BSPSource(`n,Generator(`min,`max,`size)):
                return Plan.generator((int)((LongLeaf)n).value(),
                                      ((MR_long)evalE(min,env)).get(),
                                      ((MR_long)evalE(max,env)).get(),
                                      ((MR_long)evalE(size,env)).get());
            case BSPSource(`n,`v):
                if (!v.is_variable())
                    fail;
                DataSet ds = evalD(v,env);
                for ( DataSource s: ds.source )
                    s.source_num = (int)((LongLeaf)n).value();
                return ds;
            case Merge(`x,`y):
                return Plan.merge(eval(x,env),eval(y,env));
            case Dump(`s):
                return Plan.fileCache((Bag)evalE(s,env));
            case trace(`msg,`tp,`x):
                long n = pre_trace(((MR_string)evalE(msg,env)).get());
                DataSet ds = evalD(x,env);
                trace(n,tp,new MR_dataset(ds));
                return ds;
            case apply(`f,`arg):
                if (!f.is_variable())
                    return ((MR_dataset)evalF(f,env).eval(evalE(arg))).dataset();
                MRData fnc = variable_lookup(f.toString(),global_env);
                if (fnc == null)
                    throw new Error("Unknown function: "+f);
                MRData t = evalE(arg,env);
                if (!(t instanceof Tuple))
                    throw new Error("Expected a tuple in function application: "+t);
                return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
           case nth(`u,`n):
                MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
                if (x instanceof MR_dataset)
                    return ((MR_dataset)x).dataset();
                throw new Error("Evaluation error in: "+print_query(e));
            case `v:
                if (!v.is_variable())
                    fail;
                MRData x = variable_lookup(v.toString(),env);
                if (x != null)
                    if (x instanceof MR_dataset)
                        return ((MR_dataset)x).dataset();
                x = variable_lookup(v.toString(),global_env);
                if (x != null)
                    if (x instanceof MR_dataset)
                        return ((MR_dataset)x).dataset();
                throw new Error("Variable "+v+" is not bound");
            };
            MRData d = bsp(e,env);
            if (d instanceof MR_dataset)
                return ((MR_dataset)d).dataset();
            throw new Error("Cannot evaluate the BSP plan: "+e);
        } catch (Error msg) {
            if (!Config.trace)
                throw new Error(msg.getMessage());
            System.err.println(msg.getMessage());
            throw new Error("Evaluation error in: "+print_query(e));
        } catch (Exception ex) {
            System.err.println(ex.getMessage());
            ex.printStackTrace();
            throw new Error("Evaluation error in: "+print_query(e));
        }
    }
}
