/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.mrql;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.BufferedReader;
import java.io.FileReader;

import junit.framework.TestCase;

import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.apache.log4j.*;
import java.util.Enumeration;

public abstract class QueryTest extends TestCase {
	private static String TEST_QUERY_DIR = "../tests/queries";
	private static String TEST_RESULT_DIR = "../tests/results";
	private File queryDir;
	private File resultDir;
	private static Evaluator evaluator;
	
	@BeforeClass
	public static void setUpBeforeClass() throws Exception {
		ClassImporter.load_classes();
		new TopLevel();
	}

	public void setUp() throws Exception {
		queryDir = new File(TEST_QUERY_DIR);
		resultDir = new File(TEST_RESULT_DIR);
		resultDir.mkdirs();

		// if(evaluator==null) // the spark evaluator needs to be recreated
			evaluator = createEvaluator();
		Translator.global_reset();
                for ( Enumeration en = LogManager.getCurrentLoggers(); en.hasMoreElements(); )
                    ((Logger)en.nextElement()).setLevel(Level.ERROR);
                LogManager.getRootLogger().setLevel(Level.ERROR);
	}

	public void tearDown() throws IOException {
		if (Config.compile_functional_arguments)
			Compiler.clean();
                if (Config.hadoop_mode) {
                    Plan.clean();
                    Evaluator.evaluator.shutdown(Plan.conf);
                }
	}
	
	abstract protected Evaluator createEvaluator() throws Exception;

	public void testCore() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "core_1.mrql"), resultDir));
	}

	public void testDistinct() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "distinct_1.mrql"), resultDir));
	}

	public void testFactorization() throws Exception {
            if (!Config.bsp_mode) // matrix factorization needs at least 3 nodes in BSP Hama mode
		assertEquals(0, queryAndCompare(new File(queryDir, "factorization_1.mrql"), resultDir));
	}

	public void testGroupBy() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "group_by_1.mrql"), resultDir));
	}
	
	public void testGroupByHaving() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "group_by_having_1.mrql"), resultDir));
	}	

	public void testGroupByOrderBy() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "group_by_order_by_1.mrql"), resultDir));
		assertEquals(0, queryAndCompare(new File(queryDir, "group_by_order_by_2.mrql"), resultDir));
	}

	public void testJoinGroupBy() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "join_group_by_1.mrql"), resultDir));
		assertEquals(0, queryAndCompare(new File(queryDir, "join_group_by_2.mrql"), resultDir));
		assertEquals(0, queryAndCompare(new File(queryDir, "join_group_by_3.mrql"), resultDir));
	}
	
	public void testJoinOrderBy() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "join_order_by_1.mrql"), resultDir));
	}

	public void testJoins() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "joins_1.mrql"), resultDir));
	}

	public void testKmeans() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "kmeans_1.mrql"), resultDir));
	}

	public void testLoop() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "loop_1.mrql"), resultDir));
	}

	public void testMatrix() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "matrix_1.mrql"), resultDir));
	}

	public void testNestedSelect() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "nested_select_1.mrql"), resultDir));
	}

	public void testOrderBy() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "order_by_1.mrql"), resultDir));
	}

	public void testPagerank() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "pagerank_1.mrql"), resultDir));
	}

	public void testRelationalJoin() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "relational_join_1.mrql"), resultDir));
	}

	public void testTotalAggregation() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "total_aggregation_1.mrql"), resultDir));
		assertEquals(0, queryAndCompare(new File(queryDir, "total_aggregation_2.mrql"), resultDir));
	}

	public void testUdf() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "udf_1.mrql"), resultDir));
	}

	public void testUserAggregation() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "user_aggregation_1.mrql"), resultDir));
	}

	public void testXml() throws Exception {
		assertEquals(0, queryAndCompare(new File(queryDir, "xml_1.mrql"), resultDir));
		assertEquals(0, queryAndCompare(new File(queryDir, "xml_2.mrql"), resultDir));
	}

    private int queryAndCompare ( File query, File resultDir ) throws Exception {
        System.err.println("Testing "+query);
        Translator.global_reset();
        String qname = query.getName();
        qname = qname.substring(0,qname.length()-5);
        String resultFile = resultDir.getAbsolutePath()+"/"+qname+".txt";
        boolean exists = new File(resultFile).exists();
        if (exists)
            System.setOut(new PrintStream(resultDir.getAbsolutePath()+"/"+qname+"_result.txt"));
        else System.setOut(new PrintStream(resultFile));
        Config.max_bag_size_print = -1;
        Config.testing = true;
        MRQL.evaluate("store NaN := -1;");
        MRQLLex scanner = new MRQLLex(new FileInputStream(query));
        MRQLParser parser = new MRQLParser(scanner);
        parser.setScanner(scanner);
        MRQLLex.reset();
        parser.parse();
        PrintStream errorStream = System.err;
        int i;
        if (exists && (i = compare(resultFile,resultDir.getAbsolutePath()+"/"+qname+"_result.txt")) > 0) {
            return i;
        } else if (exists) {
            return 0;
        } else {
            return 0;
        }
    }

    private int compare2(String file1, String file2) throws Exception {
        FileInputStream s1 = new FileInputStream(file1);
        FileInputStream s2 = new FileInputStream(file2);
        int b1, b2;
        int i = 1;
        while ((b1 = s1.read()) == (b2 = s2.read()) && b1 != -1 && b2 != -1)
            i++;
        return (b1 == -1 && b2 == -1) ? 0 : i;
    }

    private String read_lines ( BufferedReader reader ) throws Exception {
        String s = "";
        boolean even = true;
        do {
            String line = reader.readLine();
            if (line == null)
                return s;
            for ( int i = 0; i < line.length(); i++ )
                if (line.charAt(i) == '\"')
                    even = !even;
            s += line;
        } while (!even);
        return s;
    }

    private int compare ( String file1, String file2 ) throws Exception {
        BufferedReader reader1 = new BufferedReader(new FileReader(file1));
        BufferedReader reader2 = new BufferedReader(new FileReader(file2));
        String line1 = read_lines(reader1);
        String line2 = read_lines(reader2);
        int i = 1;
        do {
            boolean hm = Config.hadoop_mode;
            Config.hadoop_mode = false;
            MRData v1 = MRQL.query(line1);
            MRData v2 = MRQL.query(line2);
            Config.hadoop_mode = hm;
            if (!equal_value(v1,v2)) {
                System.err .println("*** "+file1+" (query "+i+"):\nFound: "+v1+"\nExpected: "+v2);
                return i;
            };
            line1 = reader1.readLine();
            line2 = reader2.readLine();
            i++;
        } while (line1 != null && line2 != null);
        return 0;
    }

    private boolean member ( MRData e, Bag v ) {
        for ( MRData x: v )
            if (equal_value(x,e))
                return true;
        return false;
    }

    private boolean equal_value ( MRData v1, MRData v2 ) {
        if (v1 instanceof Bag && v2 instanceof Bag) {
            Bag b1 = (Bag)v1;
            Bag b2 = (Bag)v2;
            if (b1.size() != b2.size())
                return false;
            for ( MRData e1: b1 )
                if (!member(e1,b2))
                    return false;
            for ( MRData e2: b2 )
                if (!member(e2,b1))
                    return false;
            return true;
        } else if (v1 instanceof Tuple && v2 instanceof Tuple) {
            Tuple t1 = (Tuple)v1;
            Tuple t2 = (Tuple)v2;
            if (t1.size() != t2.size())
                return false;
            for ( short i = 0; i < t1.size(); i++ )
                if (!equal_value(t1.get(i),t2.get(i)))
                    return false;
            return true;
        } else if (v1 instanceof Union && v2 instanceof Union) {
            Union t1 = (Union)v1;
            Union t2 = (Union)v2;
            return t1.tag() == t2.tag() && equal_value(t1.value(),t2.value());
        } else if (v1 instanceof MR_double && v2 instanceof MR_double)
            return (float)((MR_double)v1).get() == (float)((MR_double)v2).get();
        else return v1.equals(v2);
    }
}
