/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hcatalog.pig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;

import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hcatalog.HcatTestUtils;
import org.apache.hcatalog.mapreduce.HCatBaseTest;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.LogUtils;
import org.junit.Assert;
import org.junit.Test;

public class TestHCatStorer extends HCatBaseTest {

    private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";

    @Test
    public void testPartColsInData() throws IOException, CommandNeedRetryException {

        driver.run("drop table junit_unparted");
        String createTable = "create table junit_unparted(a int) partitioned by (b string) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }
        int LOOP_SIZE = 11;
        String[] input = new String[LOOP_SIZE];
        for (int i = 0; i < LOOP_SIZE; i++) {
            input[i] = i + "\t1";
        }
        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
        PigServer server = new PigServer(ExecType.LOCAL);
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");
        server.registerQuery("store A into 'default.junit_unparted' using " + HCatStorer.class.getName() + "('b=1');");
        server.registerQuery("B = load 'default.junit_unparted' using " + HCatLoader.class.getName() + "();");
        Iterator<Tuple> itr = server.openIterator("B");

        int i = 0;

        while (itr.hasNext()) {
            Tuple t = itr.next();
            Assert.assertEquals(2, t.size());
            Assert.assertEquals(t.get(0), i);
            Assert.assertEquals(t.get(1), "1");
            i++;
        }

        Assert.assertFalse(itr.hasNext());
        Assert.assertEquals(11, i);
    }

    @Test
    public void testMultiPartColsInData() throws IOException, CommandNeedRetryException {

        driver.run("drop table employee");
        String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
            " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE";

        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN",
            "111238\tKalpana\t01/01/2000\tF\tIN\tKA",
            "111239\tSatya\t01/01/2001\tM\tIN\tKL",
            "111240\tKavya\t01/01/2002\tF\tIN\tAP"};

        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData);
        PigServer pig = new PigServer(ExecType.LOCAL);
        pig.setBatchOn();
        pig.registerQuery("A = LOAD '" + INPUT_FILE_NAME + "' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
            "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
        pig.registerQuery("TN = FILTER A BY emp_state == 'TN';");
        pig.registerQuery("KA = FILTER A BY emp_state == 'KA';");
        pig.registerQuery("KL = FILTER A BY emp_state == 'KL';");
        pig.registerQuery("AP = FILTER A BY emp_state == 'AP';");
        pig.registerQuery("STORE TN INTO 'employee' USING " + HCatStorer.class.getName() + "('emp_country=IN,emp_state=TN');");
        pig.registerQuery("STORE KA INTO 'employee' USING " + HCatStorer.class.getName() + "('emp_country=IN,emp_state=KA');");
        pig.registerQuery("STORE KL INTO 'employee' USING " + HCatStorer.class.getName() + "('emp_country=IN,emp_state=KL');");
        pig.registerQuery("STORE AP INTO 'employee' USING " + HCatStorer.class.getName() + "('emp_country=IN,emp_state=AP');");
        pig.executeBatch();
        driver.run("select * from employee");
        ArrayList<String> results = new ArrayList<String>();
        driver.getResults(results);
        Assert.assertEquals(4, results.size());
        Collections.sort(results);
        Assert.assertEquals(inputData[0], results.get(0));
        Assert.assertEquals(inputData[1], results.get(1));
        Assert.assertEquals(inputData[2], results.get(2));
        Assert.assertEquals(inputData[3], results.get(3));
        driver.run("drop table employee");
    }

    @Test
    public void testStoreInPartiitonedTbl() throws IOException, CommandNeedRetryException {

        driver.run("drop table junit_unparted");
        String createTable = "create table junit_unparted(a int) partitioned by (b string) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }
        int LOOP_SIZE = 11;
        String[] input = new String[LOOP_SIZE];
        for (int i = 0; i < LOOP_SIZE; i++) {
            input[i] = i + "";
        }
        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
        PigServer server = new PigServer(ExecType.LOCAL);
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int);");
        server.registerQuery("store A into 'default.junit_unparted' using " + HCatStorer.class.getName() + "('b=1');");
        server.registerQuery("B = load 'default.junit_unparted' using " + HCatLoader.class.getName() + "();");
        Iterator<Tuple> itr = server.openIterator("B");

        int i = 0;

        while (itr.hasNext()) {
            Tuple t = itr.next();
            Assert.assertEquals(2, t.size());
            Assert.assertEquals(t.get(0), i);
            Assert.assertEquals(t.get(1), "1");
            i++;
        }

        Assert.assertFalse(itr.hasNext());
        Assert.assertEquals(11, i);
    }

    @Test
    public void testNoAlias() throws IOException, CommandNeedRetryException {
        driver.run("drop table junit_parted");
        String createTable = "create table junit_parted(a int, b string) partitioned by (ds string) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }
        PigServer server = new PigServer(ExecType.LOCAL);
        boolean errCaught = false;
        try {
            server.setBatchOn();
            server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");
            server.registerQuery("B = foreach A generate a+10, b;");
            server.registerQuery("store B into 'junit_parted' using " + HCatStorer.class.getName() + "('ds=20100101');");
            server.executeBatch();
        } catch (PigException fe) {
            PigException pe = LogUtils.getPigException(fe);
            Assert.assertTrue(pe instanceof FrontendException);
            Assert.assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode());
            Assert.assertTrue(pe.getMessage().contains("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer."));
            errCaught = true;
        }
        Assert.assertTrue(errCaught);
        errCaught = false;
        try {
            server.setBatchOn();
            server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, B:chararray);");
            server.registerQuery("B = foreach A generate a, B;");
            server.registerQuery("store B into 'junit_parted' using " + HCatStorer.class.getName() + "('ds=20100101');");
            server.executeBatch();
        } catch (PigException fe) {
            PigException pe = LogUtils.getPigException(fe);
            Assert.assertTrue(pe instanceof FrontendException);
            Assert.assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode());
            Assert.assertTrue(pe.getMessage().contains("Column names should all be in lowercase. Invalid name found: B"));
            errCaught = true;
        }
        driver.run("drop table junit_parted");
        Assert.assertTrue(errCaught);
    }

    @Test
    public void testStoreMultiTables() throws IOException, CommandNeedRetryException {

        driver.run("drop table junit_unparted");
        String createTable = "create table junit_unparted(a int, b string) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }
        driver.run("drop table junit_unparted2");
        createTable = "create table junit_unparted2(a int, b string) stored as RCFILE";
        retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        int LOOP_SIZE = 3;
        String[] input = new String[LOOP_SIZE * LOOP_SIZE];
        int k = 0;
        for (int i = 1; i <= LOOP_SIZE; i++) {
            String si = i + "";
            for (int j = 1; j <= LOOP_SIZE; j++) {
                input[k++] = si + "\t" + j;
            }
        }
        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");
        server.registerQuery("B = filter A by a < 2;");
        server.registerQuery("store B into 'junit_unparted' using " + HCatStorer.class.getName() + "();");
        server.registerQuery("C = filter A by a >= 2;");
        server.registerQuery("store C into 'junit_unparted2' using " + HCatStorer.class.getName() + "();");
        server.executeBatch();

        driver.run("select * from junit_unparted");
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);
        driver.run("select * from junit_unparted2");
        ArrayList<String> res2 = new ArrayList<String>();
        driver.getResults(res2);

        res.addAll(res2);
        driver.run("drop table junit_unparted");
        driver.run("drop table junit_unparted2");

        Iterator<String> itr = res.iterator();
        for (int i = 0; i < LOOP_SIZE * LOOP_SIZE; i++) {
            Assert.assertEquals(input[i], itr.next());
        }

        Assert.assertFalse(itr.hasNext());

    }

    @Test
    public void testStoreWithNoSchema() throws IOException, CommandNeedRetryException {

        driver.run("drop table junit_unparted");
        String createTable = "create table junit_unparted(a int, b string) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        int LOOP_SIZE = 3;
        String[] input = new String[LOOP_SIZE * LOOP_SIZE];
        int k = 0;
        for (int i = 1; i <= LOOP_SIZE; i++) {
            String si = i + "";
            for (int j = 1; j <= LOOP_SIZE; j++) {
                input[k++] = si + "\t" + j;
            }
        }
        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");
        server.registerQuery("store A into 'default.junit_unparted' using " + HCatStorer.class.getName() + "('');");
        server.executeBatch();

        driver.run("select * from junit_unparted");
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);
        driver.run("drop table junit_unparted");
        Iterator<String> itr = res.iterator();
        for (int i = 0; i < LOOP_SIZE * LOOP_SIZE; i++) {
            Assert.assertEquals(input[i], itr.next());
        }

        Assert.assertFalse(itr.hasNext());

    }

    @Test
    public void testStoreWithNoCtorArgs() throws IOException, CommandNeedRetryException {

        driver.run("drop table junit_unparted");
        String createTable = "create table junit_unparted(a int, b string) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        int LOOP_SIZE = 3;
        String[] input = new String[LOOP_SIZE * LOOP_SIZE];
        int k = 0;
        for (int i = 1; i <= LOOP_SIZE; i++) {
            String si = i + "";
            for (int j = 1; j <= LOOP_SIZE; j++) {
                input[k++] = si + "\t" + j;
            }
        }
        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");
        server.registerQuery("store A into 'junit_unparted' using " + HCatStorer.class.getName() + "();");
        server.executeBatch();

        driver.run("select * from junit_unparted");
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);
        driver.run("drop table junit_unparted");
        Iterator<String> itr = res.iterator();
        for (int i = 0; i < LOOP_SIZE * LOOP_SIZE; i++) {
            Assert.assertEquals(input[i], itr.next());
        }

        Assert.assertFalse(itr.hasNext());

    }

    @Test
    public void testEmptyStore() throws IOException, CommandNeedRetryException {

        driver.run("drop table junit_unparted");
        String createTable = "create table junit_unparted(a int, b string) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        int LOOP_SIZE = 3;
        String[] input = new String[LOOP_SIZE * LOOP_SIZE];
        int k = 0;
        for (int i = 1; i <= LOOP_SIZE; i++) {
            String si = i + "";
            for (int j = 1; j <= LOOP_SIZE; j++) {
                input[k++] = si + "\t" + j;
            }
        }
        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");
        server.registerQuery("B = filter A by a > 100;");
        server.registerQuery("store B into 'default.junit_unparted' using " + HCatStorer.class.getName() + "('','a:int,b:chararray');");
        server.executeBatch();

        driver.run("select * from junit_unparted");
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);
        driver.run("drop table junit_unparted");
        Iterator<String> itr = res.iterator();
        Assert.assertFalse(itr.hasNext());

    }

    @Test
    public void testBagNStruct() throws IOException, CommandNeedRetryException {
        driver.run("drop table junit_unparted");
        String createTable = "create table junit_unparted(b string,a struct<a1:int>,  arr_of_struct array<string>, " +
            "arr_of_struct2 array<struct<s1:string,s2:string>>,  arr_of_struct3 array<struct<s3:string>>) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        String[] inputData = new String[]{"zookeeper\t(2)\t{(pig)}\t{(pnuts,hdfs)}\t{(hadoop),(hcat)}",
            "chubby\t(2)\t{(sawzall)}\t{(bigtable,gfs)}\t{(mapreduce),(hcat)}"};

        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData);

        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (b:chararray, a:tuple(a1:int), arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)});");
        server.registerQuery("store A into 'default.junit_unparted' using " + HCatStorer.class.getName() + "('','b:chararray, a:tuple(a1:int)," +
            " arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)}');");
        server.executeBatch();

        driver.run("select * from junit_unparted");
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);
        driver.run("drop table junit_unparted");
        Iterator<String> itr = res.iterator();
        Assert.assertEquals("zookeeper\t{\"a1\":2}\t[\"pig\"]\t[{\"s1\":\"pnuts\",\"s2\":\"hdfs\"}]\t[{\"s3\":\"hadoop\"},{\"s3\":\"hcat\"}]", itr.next());
        Assert.assertEquals("chubby\t{\"a1\":2}\t[\"sawzall\"]\t[{\"s1\":\"bigtable\",\"s2\":\"gfs\"}]\t[{\"s3\":\"mapreduce\"},{\"s3\":\"hcat\"}]", itr.next());
        Assert.assertFalse(itr.hasNext());

    }

    @Test
    public void testStoreFuncAllSimpleTypes() throws IOException, CommandNeedRetryException {

        driver.run("drop table junit_unparted");
        String createTable = "create table junit_unparted(a int, b float, c double, d bigint, e string, f binary, g binary) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        int i = 0;
        String[] input = new String[3];
        input[i++] = "0\t\t\t\t\t\t"; //Empty values except first column
        input[i++] = "\t" + i * 2.1f + "\t" + i * 1.1d + "\t" + i * 2L + "\t" + "lets hcat" + "\tbinary-data"; //First column empty
        input[i++] = i + "\t" + i * 2.1f + "\t" + i * 1.1d + "\t" + i * 2L + "\t" + "lets hcat" + "\tbinary-data";

        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:float, c:double, d:long, e:chararray, f:bytearray);");
        //null gets stored into column g which is a binary field.
        server.registerQuery("store A into 'default.junit_unparted' using " + HCatStorer.class.getName() + "('','a:int, b:float, c:double, d:long, e:chararray,f:bytearray');");
        server.executeBatch();


        driver.run("select * from junit_unparted");
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);

        Iterator<String> itr = res.iterator();
        Assert.assertEquals("0\tNULL\tNULL\tNULL\tNULL\tnull\tnull", itr.next());
        Assert.assertEquals("NULL\t4.2\t2.2\t4\tlets hcat\tbinary-data\tnull", itr.next());
        Assert.assertEquals("3\t6.2999997\t3.3000000000000003\t6\tlets hcat\tbinary-data\tnull", itr.next());
        Assert.assertFalse(itr.hasNext());

        server.registerQuery("B = load 'junit_unparted' using " + HCatLoader.class.getName() + ";");
        Iterator<Tuple> iter = server.openIterator("B");
        int count = 0;
        int num5nulls = 0;
        while (iter.hasNext()) {
            Tuple t = iter.next();
            if (t.get(5) == null) {
                num5nulls++;
            } else {
                Assert.assertTrue(t.get(5) instanceof DataByteArray);
            }
            Assert.assertNull(t.get(6));
            count++;
        }
        Assert.assertEquals(3, count);
        Assert.assertEquals(1, num5nulls);
        driver.run("drop table junit_unparted");
    }

    @Test
    public void testStoreFuncSimple() throws IOException, CommandNeedRetryException {

        driver.run("drop table junit_unparted");
        String createTable = "create table junit_unparted(a int, b string) stored as RCFILE";
        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        int LOOP_SIZE = 3;
        String[] inputData = new String[LOOP_SIZE * LOOP_SIZE];
        int k = 0;
        for (int i = 1; i <= LOOP_SIZE; i++) {
            String si = i + "";
            for (int j = 1; j <= LOOP_SIZE; j++) {
                inputData[k++] = si + "\t" + j;
            }
        }
        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData);
        PigServer server = new PigServer(ExecType.LOCAL);
        server.setBatchOn();
        server.registerQuery("A = load '" + INPUT_FILE_NAME + "' as (a:int, b:chararray);");
        server.registerQuery("store A into 'default.junit_unparted' using " + HCatStorer.class.getName() + "('','a:int,b:chararray');");
        server.executeBatch();

        driver.run("select * from junit_unparted");
        ArrayList<String> res = new ArrayList<String>();
        driver.getResults(res);
        driver.run("drop table junit_unparted");
        Iterator<String> itr = res.iterator();
        for (int i = 1; i <= LOOP_SIZE; i++) {
            String si = i + "";
            for (int j = 1; j <= LOOP_SIZE; j++) {
                Assert.assertEquals(si + "\t" + j, itr.next());
            }
        }
        Assert.assertFalse(itr.hasNext());

    }

    @Test
    public void testDynamicPartitioningMultiPartColsInDataPartialSpec() throws IOException, CommandNeedRetryException {

        driver.run("drop table if exists employee");
        String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
            " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE";

        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN",
            "111238\tKalpana\t01/01/2000\tF\tIN\tKA",
            "111239\tSatya\t01/01/2001\tM\tIN\tKL",
            "111240\tKavya\t01/01/2002\tF\tIN\tAP"};

        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData);
        PigServer pig = new PigServer(ExecType.LOCAL);
        pig.setBatchOn();
        pig.registerQuery("A = LOAD '" + INPUT_FILE_NAME + "' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
            "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
        pig.registerQuery("IN = FILTER A BY emp_country == 'IN';");
        pig.registerQuery("STORE IN INTO 'employee' USING " + HCatStorer.class.getName() + "('emp_country=IN');");
        pig.executeBatch();
        driver.run("select * from employee");
        ArrayList<String> results = new ArrayList<String>();
        driver.getResults(results);
        Assert.assertEquals(4, results.size());
        Collections.sort(results);
        Assert.assertEquals(inputData[0], results.get(0));
        Assert.assertEquals(inputData[1], results.get(1));
        Assert.assertEquals(inputData[2], results.get(2));
        Assert.assertEquals(inputData[3], results.get(3));
        driver.run("drop table employee");
    }

    @Test
    public void testDynamicPartitioningMultiPartColsInDataNoSpec() throws IOException, CommandNeedRetryException {

        driver.run("drop table if exists employee");
        String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
            " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE";

        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN",
            "111238\tKalpana\t01/01/2000\tF\tIN\tKA",
            "111239\tSatya\t01/01/2001\tM\tIN\tKL",
            "111240\tKavya\t01/01/2002\tF\tIN\tAP"};

        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData);
        PigServer pig = new PigServer(ExecType.LOCAL);
        pig.setBatchOn();
        pig.registerQuery("A = LOAD '" + INPUT_FILE_NAME + "' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
            "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
        pig.registerQuery("IN = FILTER A BY emp_country == 'IN';");
        pig.registerQuery("STORE IN INTO 'employee' USING " + HCatStorer.class.getName() + "();");
        pig.executeBatch();
        driver.run("select * from employee");
        ArrayList<String> results = new ArrayList<String>();
        driver.getResults(results);
        Assert.assertEquals(4, results.size());
        Collections.sort(results);
        Assert.assertEquals(inputData[0], results.get(0));
        Assert.assertEquals(inputData[1], results.get(1));
        Assert.assertEquals(inputData[2], results.get(2));
        Assert.assertEquals(inputData[3], results.get(3));
        driver.run("drop table employee");
    }

    @Test
    public void testDynamicPartitioningMultiPartColsNoDataInDataNoSpec() throws IOException, CommandNeedRetryException {

        driver.run("drop table if exists employee");
        String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
            " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE";

        int retCode = driver.run(createTable).getResponseCode();
        if (retCode != 0) {
            throw new IOException("Failed to create table.");
        }

        String[] inputData = {};
        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData);

        PigServer pig = new PigServer(ExecType.LOCAL);
        pig.setBatchOn();
        pig.registerQuery("A = LOAD '" + INPUT_FILE_NAME + "' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
            "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
        pig.registerQuery("IN = FILTER A BY emp_country == 'IN';");
        pig.registerQuery("STORE IN INTO 'employee' USING " + HCatStorer.class.getName() + "();");
        pig.executeBatch();
        driver.run("select * from employee");
        ArrayList<String> results = new ArrayList<String>();
        driver.getResults(results);
        Assert.assertEquals(0, results.size());
        driver.run("drop table employee");
    }
}
