/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.pig.test;

import org.apache.pig.ExecType;
import org.apache.pig.PigServer;

import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;

import org.apache.pig.impl.io.FileLocalizer;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import junit.framework.TestCase;

import java.io.File;
import java.io.IOException;
import java.io.FileOutputStream;
import java.util.Iterator;
import java.util.Random;

import org.apache.pig.backend.executionengine.ExecException;

import org.apache.hadoop.conf.Configuration;
//This class tests pig behavior with large file spanning multiple blocks along with group and count functions
//Order and Distinct functions are also tested.
//This test would take a long time because of the large test files.

@RunWith(JUnit4.class)
public class TestLargeFile extends TestCase {
    
    File datFile;
    
    private long defaultBlockSize = (new Configuration()).getLong("dfs.block.size", 0);
        
    private long total = defaultBlockSize >> 1;
    private int max_rand = 500;
//    private double sum = 0.0, sumIn = 0.0;
    static MiniCluster cluster = MiniCluster.buildCluster();
    
    Integer [] COUNT = new Integer[max_rand];

    
    PigServer pig;
    String fileName, tmpFile1;
    
    @Override
    @Before
    public void setUp() throws Exception{

        System.out.println("Generating test data...");
        System.out.println("Default block size = " + defaultBlockSize);
        System.out.println("Total no. of iterations to run for the test data = " + total);
        
        datFile = File.createTempFile("StoreTest", ".dat");
        
        FileOutputStream dat = new FileOutputStream(datFile);
        
        Random rand = new Random();
        
        for(int i = 0; i < max_rand; i++) {
            COUNT[i] = 0;
        }
        
        
        for(long i = 0; i < total; i++) {

            Integer x = new Integer(rand.nextInt(max_rand));
            COUNT[x.intValue()] ++;
            dat.write((x.toString() + "\n").getBytes());
        }
        
        dat.close();
    
        try {
            pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
        }
        catch (ExecException e) {
            IOException ioe = new IOException("Failed to create Pig server");
            ioe.initCause(e);
            throw ioe;
        }
        fileName = "'" + FileLocalizer.hadoopify(datFile.toString(), pig.getPigContext()) + "'";
        tmpFile1 = "'" + FileLocalizer.getTemporaryPath(pig.getPigContext()).toString() + "'";

        datFile.delete();
    }
    
    @Override
    @After
    public void tearDown() throws Exception {

        
    }

    @AfterClass
    public static void oneTimeTearDown() throws Exception {
        cluster.shutDown();
    }
    
    @Test
    public void testLargeFile () throws Exception {
        System.out.println("Running testLargeFile...");
        pig.registerQuery("A = load " + fileName + ";");
        pig.registerQuery("A = group A by $0;");
        pig.store("A", tmpFile1, "BinStorage()");
//        pig.store("A", tmpFile1);
        pig.registerQuery("B = foreach A generate group, COUNT($1);");
        
        Iterator <Tuple> B = pig.openIterator("B");
        
        while(B.hasNext()) {
            Tuple temp = B.next();
            int index = DataType.toInteger(temp.get(0));
            int value = DataType.toInteger(temp.get(1));
            System.out.println("COUNT [" + index + "] = " + COUNT[index] + " B[" + index + "] = " + value);
            
            assertEquals(COUNT[index].intValue(), value);
            
        }
                
    }
    
    @Test
    public void testOrder () throws Exception {
        System.out.println("Running testOrder...");
        int N = 0, Nplus1 = 0;
        pig.registerQuery("A = load " + fileName + ";");
        pig.registerQuery("B = order A by $0;");
        
        Iterator <Tuple> B = pig.openIterator("B");
        
        if(B.hasNext()) {
            N = DataType.toInteger(B.next().get(0));
        }
        
        while(B.hasNext()) {
            int flag = 0;
            Nplus1 = DataType.toInteger(B.next().get(0));
            if(Nplus1 >= N) {
                flag = 1;
            }
            assertEquals(flag, 1);
            
            N = Nplus1;
            
        }
        
        
    }
    
    @Test
    public void testDistinct () throws Exception {
        System.out.println("Running testDistinct...");
        pig.registerQuery("A = load " + fileName + ";");
        pig.registerQuery("B = distinct A;");
        
        Iterator <Tuple> B = pig.openIterator("B");
        
        Integer [] COUNT_Test = new Integer [max_rand];
        Integer [] COUNT_Data = new Integer [max_rand];
        
        for(int i = 0; i < max_rand; i++) {
            COUNT_Test[i] = 0;
            if (COUNT[i] > 0) {
                COUNT_Data[i] = 1;
            } else {
                COUNT_Data[i] = 0;
            }
        }
        
        while(B.hasNext()) {
            int temp = DataType.toInteger(B.next().get(0));
            COUNT_Test[temp] ++;
        }
        
        for(int i = 0; i < max_rand; i++) {
            assertEquals(COUNT_Test[i].intValue(), COUNT_Data[i].intValue());
        }
        
    }

}
