/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hcatalog.pig;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import junit.framework.TestCase;

import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hcatalog.data.Pair;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;

public class TestHCatStorerMulti extends TestCase {
    private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
        "/build/test/data/" + TestHCatStorerMulti.class.getCanonicalName();
    private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
    private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";

    private static final String BASIC_TABLE = "junit_unparted_basic";
    private static final String PARTITIONED_TABLE = "junit_parted_basic";
    private static Driver driver;

    private static Map<Integer, Pair<Integer, String>> basicInputData;

    private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
        driver.run("drop table " + tablename);
    }

    private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
        String createTable;
        createTable = "create table " + tablename + "(" + schema + ") ";
        if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
            createTable = createTable + "partitioned by (" + partitionedBy + ") ";
        }
        createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
            "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + "]");
        }
    }

    private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
        createTable(tablename, schema, null);
    }

    @Override
    protected void setUp() throws Exception {
        if (driver == null) {
            HiveConf hiveConf = new HiveConf(this.getClass());
            hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
            hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
            hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
            hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
            driver = new Driver(hiveConf);
            SessionState.start(new CliSessionState(hiveConf));
        }

        cleanup();
    }

    @Override
    protected void tearDown() throws Exception {
        cleanup();
    }

    public void testStoreBasicTable() throws Exception {


        createTable(BASIC_TABLE, "a int, b string");

        populateBasicFile();

        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");
        server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");

        server.executeBatch();

        driver.run("select * from " + BASIC_TABLE);
        ArrayList<String> unpartitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
        driver.getResults(unpartitionedTableValuesReadFromHiveDriver);
        assertEquals(basicInputData.size(), unpartitionedTableValuesReadFromHiveDriver.size());
    }

    public void testStorePartitionedTable() throws Exception {
        createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");

        populateBasicFile();

        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");

        server.registerQuery("B2 = filter A by a < 2;");
        server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
        server.registerQuery("C2 = filter A by a >= 2;");
        server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");

        server.executeBatch();

        driver.run("select * from " + PARTITIONED_TABLE);
        ArrayList<String> partitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
        driver.getResults(partitionedTableValuesReadFromHiveDriver);
        assertEquals(basicInputData.size(), partitionedTableValuesReadFromHiveDriver.size());
    }

    public void testStoreTableMulti() throws Exception {


        createTable(BASIC_TABLE, "a int, b string");
        createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");

        populateBasicFile();

        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");
        server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");

        server.registerQuery("B2 = filter A by a < 2;");
        server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
        server.registerQuery("C2 = filter A by a >= 2;");
        server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");

        server.executeBatch();

        driver.run("select * from " + BASIC_TABLE);
        ArrayList<String> unpartitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
        driver.getResults(unpartitionedTableValuesReadFromHiveDriver);
        driver.run("select * from " + PARTITIONED_TABLE);
        ArrayList<String> partitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
        driver.getResults(partitionedTableValuesReadFromHiveDriver);
        assertEquals(basicInputData.size(), unpartitionedTableValuesReadFromHiveDriver.size());
        assertEquals(basicInputData.size(), partitionedTableValuesReadFromHiveDriver.size());
    }

    private void populateBasicFile() throws IOException {
        int LOOP_SIZE = 3;
        String[] input = new String[LOOP_SIZE * LOOP_SIZE];
        basicInputData = new HashMap<Integer, Pair<Integer, String>>();
        int k = 0;
        File file = new File(INPUT_FILE_NAME);
        file.deleteOnExit();
        FileWriter writer = new FileWriter(file);
        for (int i = 1; i <= LOOP_SIZE; i++) {
            String si = i + "";
            for (int j = 1; j <= LOOP_SIZE; j++) {
                String sj = "S" + j + "S";
                input[k] = si + "\t" + sj;
                basicInputData.put(k, new Pair<Integer, String>(i, sj));
                writer.write(input[k] + "\n");
                k++;
            }
        }
        writer.close();
    }

    private void cleanup() throws IOException, CommandNeedRetryException {
        File f = new File(TEST_WAREHOUSE_DIR);
        if (f.exists()) {
            FileUtil.fullyDelete(f);
        }
        new File(TEST_WAREHOUSE_DIR).mkdirs();

        dropTable(BASIC_TABLE);
        dropTable(PARTITIONED_TABLE);
    }
}
