/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.pig.test;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import junit.framework.TestCase;

import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
import org.apache.pig.newplan.logical.rules.MapKeysPruneHelper;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.PlanTransformListener;
import org.apache.pig.newplan.optimizer.Rule;

public class TestNewPlanPruneMapKeys extends TestCase {
    PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
    
    private org.apache.pig.newplan.logical.relational.LogicalPlan migratePlan(String query) throws Exception{
        PigServer pigServer = new PigServer( pc );
    	org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = 
            Util.buildLp(pigServer, query);
        
        try {
            // run filter rule
            Set<Rule> s = new HashSet<Rule>();
            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
            ls.add(s);
            // Add the PruneMap Filter
            Rule r = new ColumnMapKeyPrune("PruneMapKeys");
            s.add(r);            
            
            printPlan((org.apache.pig.newplan.logical.relational.LogicalPlan)newPlan);
            
            // Run the optimizer
            MyPlanOptimizer optimizer = new MyPlanOptimizer(newPlan, ls, 3);
            optimizer.addPlanTransformListener(new ProjectionPatcher());
            optimizer.addPlanTransformListener(new SchemaPatcher());
            optimizer.optimize();
            
            return newPlan;
        }catch(Exception e) {
            throw new VisitorException(e);
        }
    }
    
    public class MyPlanOptimizer extends PlanOptimizer {

        protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
                int iterations) {
            super(p, rs, iterations);           
        }
        
        public void addPlanTransformListener(PlanTransformListener listener) {
            super.addPlanTransformListener(listener);
        }
        
    }
        
    @SuppressWarnings("unchecked")
    public void testSimplePlan() throws Exception {
        String query = "a =load 'd.txt' as (a:map[], b:int, c:float);" +
        "b = filter a by a#'name' == 'hello';" +
        "store b into 'empty';";        
        
        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
        
        List<Operator> sources = newLogicalPlan.getSources();
        assertEquals( 1, sources.size() );
        for( Operator source : sources ) {
            Map<Long,Set<String>> annotation = 
                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
            assertTrue(annotation == null || annotation.isEmpty() );
        }
    }
    
    @SuppressWarnings("unchecked")
    public void testSimplePlan2() throws Exception {
        String query = "a =load 'd.txt' as (a:map[], b:int, c:float);" +
        "b = filter a by a#'name' == 'hello';" +
        "c = foreach b generate b,c;" +
        "store c into 'empty';";        
        
        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
        
        assertEquals( 1, newLogicalPlan.getSources().size() );
        LOLoad load = (LOLoad) newLogicalPlan.getSources().get(0);
        Map<Long,Set<String>> annotation = 
            (Map<Long, Set<String>>) load.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
        assertTrue( annotation != null );
        assertEquals( 1, annotation.keySet().size() );
        Integer[] keySet = annotation.keySet().toArray( new Integer[0] );
        assertEquals( new Integer(0), keySet[0] );
        Set<String> keys = annotation.get(0);
        assertEquals( 1, keys.size() );
        assertEquals( "name", keys.toArray( new String[0] )[0] );            
    }
    
    @SuppressWarnings("unchecked")
    public void testSimplePlan3() throws Exception {
        String query = "a =load 'd.txt' as (a:map[], b:int, c:float);" +
        "b = filter a by a#'name' == 'hello';" +
        "c = foreach b generate a#'age',b,c;" +
        "store c into 'empty';";        
        
        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
        
        assertEquals( 1, newLogicalPlan.getSources().size() );
        LOLoad load = (LOLoad) newLogicalPlan.getSources().get(0);
        Map<Long,Set<String>> annotation = 
            (Map<Long, Set<String>>) load.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
        assertTrue( annotation != null );
        assertEquals( 1, annotation.keySet().size() );
        Integer[] keySet = annotation.keySet().toArray( new Integer[0] );
        assertEquals( new Integer(0), keySet[0] );
        Set<String> keys = annotation.get(0);
        assertEquals( 2, keys.size() );
        assertTrue( keys.contains("name") );
        assertTrue( keys.contains("age"));
    }
    
    @SuppressWarnings("unchecked")
    public void testSimplePlan4() throws Exception {
        String query = "a =load 'd.txt' as (a:map[], b:int, c:float);" +
        "b = filter a by a#'name' == 'hello';" +
        "c = foreach b generate a#'age',a,b,c;" +
        "store c into 'empty';";        
        
        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
        
        List<Operator> sources = newLogicalPlan.getSources();
        assertEquals( 1, sources.size() );
        for( Operator source : sources ) {
            Map<Long,Set<String>> annotation = 
                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
            assertTrue(annotation == null || annotation.isEmpty() );
        }
    }
    
    @SuppressWarnings("unchecked")
    public void testSimplePlan5() throws Exception {
        String query = "a =load 'd.txt' as (a:chararray, b:int, c:float);" +
        "b = filter a by a == 'hello';" +
        "c = foreach b generate a,b,c;" +
        "store c into 'empty';";        
        
        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
        
        List<Operator> sources = newLogicalPlan.getSources();
        assertEquals( 1, sources.size() );
        for( Operator source : sources ) {
            Map<Long,Set<String>> annotation = 
                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
            assertTrue(annotation == null );
        }
    }
    
    @SuppressWarnings("unchecked")
    public void testSimplePlan6() throws Exception {
        String query = "a =load 'd.txt';" +
        "b = filter a by $0 == 'hello';" +
        "c = foreach b generate $0,$1,$2;" +
        "store c into 'empty';";        
        
        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
        
        List<Operator> sources = newLogicalPlan.getSources();
        assertEquals( 1, sources.size() );
        for( Operator source : sources ) {
            Map<Long,Set<String>> annotation = 
                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
            assertTrue(annotation == null );
        }
    }
    
    @SuppressWarnings("unchecked")
    public void testSimplePlan7() throws Exception {
        String query = "a =load 'd.txt';" +
        "a1 = load 'b.txt' as (a:map[],b:int, c:float);" +
        "b = join a by $0, a1 by a#'name';" +
        "c = foreach b generate $0,$1,$2;" +
        "store c into 'empty';";        
        
        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
        printPlan( newLogicalPlan );
        
        List<Operator> sources = newLogicalPlan.getSources();
        assertEquals( 2, sources.size() );
        for( Operator source : sources ) {
            Map<Long,Set<String>> annotation = 
                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
            assertTrue( annotation == null || annotation.isEmpty() );
        }
    }

    @SuppressWarnings("unchecked")
    public void testSimplePlan8() throws Exception {
        String query = "a =load 'd.txt';" +
        "a1 = load 'b.txt' as (a:chararray,b:int, c:float);" +
        "b = join a by $0, a1 by a;" +
        "c = foreach b generate $0,$1,$2;" +
        "store c into 'empty';";        
        
        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
        printPlan(newLogicalPlan);
        
        List<Operator> sources = newLogicalPlan.getSources();
        assertEquals( 2, sources.size() );
        for( Operator source : sources ) {
            Map<Long,Set<String>> annotation = 
                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
            assertTrue( annotation == null || annotation.isEmpty() );
        }
    }
    
    public void printPlan(org.apache.pig.newplan.logical.relational.LogicalPlan logicalPlan ) throws Exception {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(out);
        LogicalPlanPrinter pp = new LogicalPlanPrinter(logicalPlan,ps);
        pp.visit();
        System.err.println(out.toString());
    }
    
    public void printPlan(PhysicalPlan physicalPlan) throws Exception {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(out);
        physicalPlan.explain(ps, "text", true);
        System.err.println(out.toString());
    }
}
