/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.mrql;

import java.io.*;
import java.util.Enumeration;
import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.*;
import jline.*;


final public class Main {
    public final static String version = "0.9.4";

    public static PrintStream print_stream;
    public static Configuration conf;
    static MRQLParser parser = new MRQLParser();
    public static String query_file = "";

    public static void include_file ( String file ) {
        try {
            MRQLParser old_parser = parser;
            boolean old_interactive = Config.interactive;
            try {
                parser = new MRQLParser(new MRQLLex(new FileInputStream(file)));
            } catch (Exception e) {
                Path path = new Path(file);
                FileSystem fs = path.getFileSystem(conf);
                parser = new MRQLParser(new MRQLLex(fs.open(path)));
            };
            Config.interactive = false;
            parser.parse();
            Config.interactive = old_interactive;
            parser = old_parser;
        } catch (Exception ex) {
            ex.printStackTrace(System.err);
            throw new Error(ex);
        }
    }

    private static void initialize_evaluator () throws Exception {
        if (Config.bsp_mode)
            Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.BSPEvaluator").newInstance();
        else if (Config.spark_mode)
            Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.SparkEvaluator").newInstance();
        else if (Config.flink_mode)
            Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.FlinkEvaluator").newInstance();
        else // when Config.map_reduce_mode but also the default
            Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.MapReduceEvaluator").newInstance();
    }

    public static void initialize () throws Exception {
        if (Evaluator.evaluator == null) {
            if (Plan.conf == null)
                Plan.conf = new Configuration();
            conf = Plan.conf;
            Config.read(Plan.conf);
            initialize_evaluator();
        }
    }

    public static void main ( String[] args ) throws Exception {
        Config.hadoop_mode = false;
        if (args.length == 2 && args[0].equals("args"))  // needed for mrql.flink script
            args = args[1].substring(1).split("!");
        for ( String arg: args ) {
            Config.hadoop_mode |= arg.equals("-local") || arg.equals("-dist");
            Config.bsp_mode |= arg.equals("-bsp");
            Config.spark_mode |= arg.equals("-spark");
            Config.flink_mode |= arg.equals("-flink");
        };
        Config.map_reduce_mode = !Config.bsp_mode && !Config.spark_mode && !Config.flink_mode;
        initialize_evaluator();
	if (Config.hadoop_mode) {
	    conf = Evaluator.evaluator.new_configuration();
	    GenericOptionsParser gop = new GenericOptionsParser(conf,args);
	    conf = gop.getConfiguration();
	    args = gop.getRemainingArgs();
	};
        Config.parse_args(args,conf);
        Config.hadoop_mode = Config.local_mode || Config.distributed_mode;
        if (!Config.info) {
            for ( Enumeration en = LogManager.getCurrentLoggers(); en.hasMoreElements(); )
                ((Logger)en.nextElement()).setLevel(Level.WARN);
            LogManager.getRootLogger().setLevel(Level.WARN);
        };
        Evaluator.evaluator.init(conf);
        new TopLevel();
        System.out.print("Apache MRQL version "+version+" (");
        if (Config.compile_functional_arguments)
            System.out.print("compiled ");
        else System.out.print("interpreted ");
        if (Config.hadoop_mode) {
            if (Config.local_mode)
                System.out.print("local ");
            else if (Config.distributed_mode)
                System.out.print("distributed ");
            if (Config.spark_mode)
                System.out.println("Spark mode using "+Config.nodes+" tasks)");
            else if (Config.flink_mode)
                System.out.println("Flink mode using "+Config.nodes+" tasks)");
            else if (Config.bsp_mode)
                System.out.println("Hama BSP mode over "+Config.nodes+" BSP tasks)");
            else if (Config.nodes > 0)
                System.out.println("Hadoop MapReduce mode with "+Config.nodes+" reducers)");
            else if (!Config.local_mode)
                System.out.println("Hadoop MapReduce mode with 1 reducer, use -nodes to change it)");
            else System.out.println("Hadoop MapReduce mode)");
        } else if (Config.bsp_mode)
            System.out.println("in-memory BSP mode)");
        else System.out.println("in-memory Java mode)");
        if (Config.interactive) {
            System.out.println("Type quit to exit");
            ConsoleReader reader = new ConsoleReader();
            reader.setBellEnabled(false);
            History history = new History(new File(System.getProperty("user.home")+"/.mrqlhistory"));
            reader.setHistory(history);
            reader.setUseHistory(false);
            try {
            loop: while (true) {
                String line = "";
                String s = "";
                try {
                    if (Config.hadoop_mode && Config.bsp_mode)
                        Config.write(Plan.conf);
                    do {
                        s = reader.readLine("> ");
                        if (s != null && (s.equals("quit") || s.equals("exit")))
                            break loop;
                        if (s != null)
                            line += " "+s;
                    } while (s == null || s.indexOf(";") <= 0);
                    line = line.substring(1);
                    history.addToHistory(line);
                    parser = new MRQLParser(new MRQLLex(new StringReader(line)));
                    MRQLLex.reset();
                    parser.parse();
                } catch (EOFException x) {
                    break;
                } catch (Exception x) {
                    if (x.getMessage() != null)
                        System.out.println(x);
                } catch (Error x) {
                    System.out.println(x);
                }
                }
            } finally {
                if (Config.hadoop_mode) {
                    Plan.clean();
                    Evaluator.evaluator.shutdown(Plan.conf);
                };
                if (Config.compile_functional_arguments)
                    Compiler.clean();
            }
        } else try {
                if (Config.hadoop_mode && Config.bsp_mode)
                    Config.write(Plan.conf);
                try {
                    parser = new MRQLParser(new MRQLLex(new FileInputStream(query_file)));
                } catch (Exception e) {
                    // when the query file is in HDFS
                    Path path = new Path(query_file);
                    FileSystem fs = path.getFileSystem(conf);
                    parser = new MRQLParser(new MRQLLex(fs.open(path)));
                };
                parser.parse();
            } finally {
                if (Config.hadoop_mode) {
                    Plan.clean();
                    Evaluator.evaluator.shutdown(Plan.conf);
                };
                if (Config.compile_functional_arguments)
                    Compiler.clean();
            }
    }
}
