/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.pig.test;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.*;

import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.LimitOptimizer;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.PigContext;

import junit.framework.Assert;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestOptimizeLimit {
    final String FILE_BASE_LOCATION = "test/org/apache/pig/test/data/DotFiles/" ;
    static final int MAX_SIZE = 100000;
    PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
  
    PigServer pigServer;
    
    @Before
    public void setup() throws ExecException {
        pigServer = new PigServer( pc );
    }
    
    @After
    public void tearDown() {
        
    }
    
    void compareWithGoldenFile(LogicalPlan plan, String filename) throws Exception {
        String actualPlan = printLimitGraph(plan);
        System.out.println("We get:");
        System.out.println(actualPlan);
        
        FileInputStream fis = new FileInputStream(filename);
        byte[] b = new byte[MAX_SIZE];
        int len = fis.read(b);
        String goldenPlan = new String(b, 0, len);
        System.out.println("Expected:");
        System.out.println(goldenPlan);
        
		Assert.assertEquals(goldenPlan, actualPlan + "\n");
    }

    public static String printLimitGraph(LogicalPlan plan) throws IOException {
    	OptimizeLimitPlanPrinter printer = new OptimizeLimitPlanPrinter(plan) ;
        String rep = "digraph graph1 {\n";
    	rep = rep + printer.printToString() ;
    	rep = rep + "}";
        return rep;
    }

    @Test
    // Merget limit into sort
	public void testOPLimit1Optimizer() throws Exception {
	    String query = "A = load 'myfile';" + 
	                   "B = order A by $0;" +
	                   "C = limit B 100;" +
                       "store C into 'empty';";  
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan1.dot");
	}

	@Test
	// Merge limit into limit
	public void testOPLimit2Optimizer() throws Exception {
	    String query = "A = load 'myfile';" + 
	                   "B = limit A 10;" + 
	                   "C = limit B 100;" + "store C into 'empty';";
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan2.dot");
        Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 10);
	}

	@Test
	// Duplicate limit with two inputs
	public void testOPLimit3Optimizer() throws Exception {
	    String query = "A = load 'myfile1';" +
	    "B = load 'myfile2';" +
	    "C = cross A, B;" +
	    "D = limit C 100;" + "store D into 'empty';";
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan3.dot");
	}

	@Test
	// Duplicte limit with one input
	public void testOPLimit4Optimizer() throws Exception {
	    String query = "A = load 'myfile1';" + 
	                   "B = group A by $0;" + "C = foreach B generate flatten(A);" + "D = limit C 100;" +
	                   "store D into 'empty';";
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan4.dot");
	}

	@Test
	// Move limit up
    public void testOPLimit5Optimizer() throws Exception {
        String query = "A = load 'myfile1';" + 
        "B = foreach A generate $0;" +
        "C = limit B 100;" + "store C into 'empty';" ;
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan5.dot");
        Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 100);
    }
	
    @Test
    // Multiple LOLimit
	public void testOPLimit6Optimizer() throws Exception {
	    String query = "A = load 'myfile';" +
	    "B = limit A 50;" + 
	    "C = limit B 20;" +
	    "D = limit C 100;" +  "store D into 'empty';";
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan6.dot");
        Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 20);
	}
    
    @Test
    // Limit stay the same for ForEach with a flatten
    public void testOPLimit7Optimizer() throws Exception {
        String query = "A = load 'myfile1';" + 
        "B = foreach A generate flatten($0);" +
        "C = limit B 100;" + "store C into 'empty';";
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan7.dot");
    }
    
    @Test
    //Limit in the local mode, need to make sure limit stays after a sort
    public void testOPLimit8Optimizer() throws Exception {
        String query = "A = load 'myfile';" + 
        "B = order A by $0;" +
        "C = limit B 10;" + "store C into 'empty';";
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
       compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan8.dot");
        
    }
    
    @Test
    public void testOPLimit9Optimizer() throws Exception {
        String query = "A = load 'myfile';" + 
        "B = order A by $0;" +
        "C = limit B 10;" + "store C into 'empty';";
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
       compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan9.dot");
        
    }

    @Test
    //See bug PIG-913
    public void testOPLimit10Optimizer() throws Exception {
        String query = "A = load 'myfile' AS (s:chararray);" +
        "B = limit A 100;" + "C = GROUP B by $0;" + "store C into 'empty';";
	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
	    optimizePlan(newLogicalPlan);
        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan10.dot");
        Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 100);
    }

    @Test
    //See bug PIG-995
    //We shall throw no exception here
    public void testOPLimit11Optimizer() throws Exception {
    	String query = "B = foreach (limit (order (load 'myfile' AS (a0, a1, a2)) by $1) 10) generate $0;";
    	LogicalPlan plan = Util.buildLp(pigServer, query);
	    optimizePlan(plan);
    }
    
    @Test
    // See PIG-2570
    public void testLimitSoftLink() throws Exception {
        String query = "A = LOAD 'data1.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray);"
            + "B = group A all; "
            + "C = foreach B generate SUM(A.age) as total; "
            + "D = foreach A generate owner, age/(double)C.total AS percentAge; "
            + "F = LIMIT D C.total/8;"
            + "store F into 'output';";
        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
        optimizePlan(newLogicalPlan);
        LOStore store = (LOStore)newLogicalPlan.getSinks().get(0);
        LOForEach foreach1 = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
        LOForEach foreach2 = (LOForEach)newLogicalPlan.getPredecessors(foreach1).get(0);
        LOLimit limit = (LOLimit)newLogicalPlan.getPredecessors(foreach2).get(0);
        Assert.assertTrue(newLogicalPlan.getSoftLinkPredecessors(limit).get(0) instanceof LOStore);
    }


    public class MyPlanOptimizer extends LogicalPlanOptimizer {
        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
            super( p, iterations, new HashSet<String>() );
        }
        
        protected List<Set<Rule>> buildRuleSets() {            
            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
            
            Set<Rule> s = null;
            Rule r = null;
            
            s = new HashSet<Rule>();
            r = new LoadTypeCastInserter( "TypeCastInserter");
            s.add(r);
            ls.add(s);
            
            s = new HashSet<Rule>();
            r = new LimitOptimizer("OptimizeLimit");
            s.add(r);
            ls.add(s);
            
            return ls;
        }
    }    

    private LogicalPlan optimizePlan(LogicalPlan plan) throws IOException {
        PlanOptimizer optimizer = new MyPlanOptimizer( plan, 3 );
        optimizer.optimize();
        return plan;
    }

}
