/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.pig.test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import com.google.common.collect.Lists;

import static org.junit.Assert.assertTrue;

public class TestHBaseStorage {
    private static Connection connection;

    private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class);
    private static HBaseTestingUtility util;
    private static Configuration conf;
    private static PigServer pig;

    enum DataFormat {
        HBaseBinary, UTF8PlainText,
    }

    // Test Table constants
    private static final String TESTTABLE_1 = "pigtable_1";
    private static final String TESTTABLE_2 = "pigtable_2";
    private static final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
    private static final byte[] COLUMNFAMILY2 = Bytes.toBytes("pig2");
    private static final String TESTCOLUMN_A = "pig:col_a";
    private static final String TESTCOLUMN_B = "pig:col_b";
    private static final String TESTCOLUMN_C = "pig:col_c";

    private static final int TEST_ROW_COUNT = 100;

    private enum TableType {ONE_CF, TWO_CF};
    private TableType lastTableType;

    @BeforeClass
    public static void setUp() throws Exception {
        // This is needed by Pig
        Configuration hadoopConf = new Configuration();
        hadoopConf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, Paths.get(Util.getTestDirectory(TestLoad.class)).toAbsolutePath().toString());

        conf = HBaseConfiguration.create(new Configuration());
        // Setting this property is required due to a bug in HBase 2.0
        // will be fixed in 2.0.1, see HBASE-20544. It doesn't have any effect on HBase 1.x
        conf.set("hbase.localcluster.assign.random.ports", "true");

        util = new HBaseTestingUtility(conf);
        util.startMiniZKCluster();
        util.startMiniHBaseCluster(1, 1);
        connection = ConnectionFactory.createConnection(conf);
    }

    @AfterClass
    public static void oneTimeTearDown() throws Exception {
        // In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster()
        // here instead.
        connection.close();
        MiniHBaseCluster hbc = util.getHBaseCluster();
        if (hbc != null) {
            hbc.shutdown();
            hbc.join();
        }
        util.shutdownMiniZKCluster();
    }


    @Before
    public void beforeTest() throws Exception {
        pig = new PigServer(Util.getLocalTestMode(), conf);
    }

    @After
    public void tearDown() throws Exception {
        try {
            deleteAllRows(TESTTABLE_1);
        } catch (Exception e) {}
        try {
            deleteAllRows(TESTTABLE_2);
        } catch (Exception e) {}
        pig.shutdown();
    }

    // DVR: I've found that it is faster to delete all rows in small tables
    // than to drop them.
    private void deleteAllRows(String tableName) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        ResultScanner scanner = table.getScanner(new Scan());
        List<Delete> deletes = Lists.newArrayList();
        for (Result row : scanner) {
            deletes.add(new Delete(row.getRow()));
        }
        table.delete(deletes);
        table.close();
    }

    /**
     * Test Load from hbase with map parameters
     *
     */
    @Test
    public void testLoadWithMap_1() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A
                + " "
                + TESTCOLUMN_B
                + " "
                + TESTCOLUMN_C
                + " pig:"
                + "','-loadKey -cacheBlocks true') as (rowKey, col_a, col_b, col_c, pig_cf_map);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBase Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = ((DataByteArray) t.get(0)).toString();
            String col_a = ((DataByteArray) t.get(1)).toString();
            String col_b = ((DataByteArray) t.get(2)).toString();
            String col_c = ((DataByteArray) t.get(3)).toString();
            Map pig_cf_map = (Map) t.get(4);

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals(count, Integer.parseInt(col_a));
            Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + count, col_c);

            Assert.assertEquals(5, pig_cf_map.size());
            Assert.assertEquals(count,
                    Integer.parseInt(pig_cf_map.get("col_a").toString()));
            Assert.assertEquals(count + 0.0,
                    Double.parseDouble(pig_cf_map.get("col_b").toString()), 1e-6);
            Assert.assertEquals("Text_" + count,
                    ((DataByteArray) pig_cf_map.get("col_c")).toString());
            Assert.assertEquals("PrefixedText_" + count,
                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());

            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("LoadFromHBase done");
    }

    /**
     * Test Load from hbase with maxTimestamp, minTimestamp, timestamp
     *
     */
    @Test
    public void testLoadWithSpecifiedTimestampAndRanges() throws IOException {
        long beforeTimeStamp = System.currentTimeMillis() - 10;

        Table table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        long afterTimeStamp = System.currentTimeMillis() + 10;

        Assert.assertEquals("MaxTimestamp is set before rows added", 0, queryWithTimestamp(null , beforeTimeStamp, null));

        Assert.assertEquals("MaxTimestamp is set after rows added", TEST_ROW_COUNT, queryWithTimestamp(null, afterTimeStamp, null));

        Assert.assertEquals("MinTimestamp is set after rows added", 0, queryWithTimestamp(afterTimeStamp, null, null));

        Assert.assertEquals("MinTimestamp is set before rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, null, null));

        Assert.assertEquals("Timestamp range is set around rows added", TEST_ROW_COUNT, queryWithTimestamp(beforeTimeStamp, afterTimeStamp, null));

        Assert.assertEquals("Timestamp range is set after rows added", 0, queryWithTimestamp(afterTimeStamp, afterTimeStamp + 10, null));

        Assert.assertEquals("Timestamp range is set before rows added", 0, queryWithTimestamp(beforeTimeStamp - 10, beforeTimeStamp, null));

        Assert.assertEquals("Timestamp is set before rows added", 0, queryWithTimestamp(null, null, beforeTimeStamp));

        Assert.assertEquals("Timestamp is set after rows added", 0, queryWithTimestamp(null, null, afterTimeStamp));

        long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatestCell(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp();

        assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0);


        LOG.info("LoadFromHBase done");
    }

    private int queryWithTimestamp(Long minTimestamp, Long maxTimestamp, Long timestamp) throws IOException,
            ExecException {

        StringBuilder extraParams = new StringBuilder();

        if (minTimestamp != null){
            extraParams.append(" -minTimestamp " + minTimestamp + " ");
        }

        if (maxTimestamp != null){
            extraParams.append(" -maxTimestamp " + maxTimestamp + " ");
        }

        if (timestamp != null){
            extraParams.append(" -timestamp " + timestamp + " ");
        }


        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A
                + " "
                + TESTCOLUMN_B
                + " "
                + TESTCOLUMN_C
                + " pig:"
                + "','-loadKey " + extraParams.toString() + "') as (rowKey, col_a, col_b, col_c);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBase Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = ((DataByteArray) t.get(0)).toString();
            String col_a = ((DataByteArray) t.get(1)).toString();
            String col_b = ((DataByteArray) t.get(2)).toString();
            String col_c = ((DataByteArray) t.get(3)).toString();

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals(count, Integer.parseInt(col_a));
            Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + count, col_c);

            count++;
        }
        return count;
    }

    /**
     *     * Test Load from hbase with map parameters and column prefix
     *
     */
    @Test
    public void testLoadWithMap_2_col_prefix() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + "pig:prefixed_col_*"
                + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[]);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBase Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = t.get(0).toString();
            Map pig_cf_map = (Map) t.get(1);
            Assert.assertEquals(2, t.size());

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals("PrefixedText_" + count,
                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
            Assert.assertEquals(1, pig_cf_map.size());

            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("LoadFromHBase done");
    }

    /**
     * Test Load from hbase with map parameters and multiple column prefixes
     *
     */
    @Test
    public void testLoadWithMap_3_col_prefix() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText, TableType.TWO_CF);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + "pig2:* pig:prefixed_col_*"
                + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBase Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = t.get(0).toString();
            Map pig_secondery_cf_map = (Map) t.get(1);
            Map pig_prefix_cf_map = (Map) t.get(2);
            Assert.assertEquals(3, t.size());

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals(count,
                    Integer.parseInt(pig_secondery_cf_map.get("col_x").toString()));
            Assert.assertEquals("PrefixedText_" + count,
                    ((DataByteArray) pig_prefix_cf_map.get("prefixed_col_d")).toString());
            Assert.assertEquals(1, pig_prefix_cf_map.size());

            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("LoadFromHBase done");
    }

    /**
     *     * Test Load from hbase with map parameters and column prefix with a
     *     static column
     *
     */
    @Test
    public void testLoadWithFixedAndPrefixedCols() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        //NOTE: I think there is some strangeness in how HBase handles column
        // filters. I was only able to reproduce a bug related to missing column
        // values in the response when I used 'sc' as a column name, instead of
        // 'col_a' as I use below.
        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + "pig:sc pig:prefixed_col_*"
                + "','-loadKey') as (rowKey:chararray, sc:chararray, pig_cf_map:map[]);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBase Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = (String) t.get(0);
            String col_a = t.get(1) != null ? t.get(1).toString() : null;
            Map pig_cf_map = (Map) t.get(2);
            Assert.assertEquals(3, t.size());

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals("PrefixedText_" + count,
                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
            Assert.assertEquals(1, pig_cf_map.size());
            Assert.assertEquals(Integer.toString(count), col_a);

            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("LoadFromHBase done");
    }

    /**
     *     * Test Load from hbase with map parameters and with a
     *     static column
     *
     */
    @Test
    public void testLoadWithFixedAndPrefixedCols2() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + "pig:col_a pig:prefixed_col_*"
                + "','-loadKey') as (rowKey:chararray, col_a:chararray, pig_cf_map:map[]);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBase Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = (String) t.get(0);
            String col_a = t.get(1) != null ? t.get(1).toString() : null;
            Map pig_cf_map = (Map) t.get(2);
            Assert.assertEquals(3, t.size());

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals("PrefixedText_" + count,
                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
            Assert.assertEquals(1, pig_cf_map.size());
            Assert.assertEquals(Integer.toString(count), col_a);

            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("LoadFromHBase done");
    }

    @Test
    public void testLoadWithFixedAndPrefixedCols3() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + "pig:* pig:prefixed_col_*"
                + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBase Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = (String) t.get(0);
            Map pig_cf_map = (Map) t.get(1);
            Map pig_prefix_cf_map = (Map) t.get(2);
            Assert.assertEquals(3, t.size());

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals("PrefixedText_" + count,
                    ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString());
            Assert.assertEquals(1, pig_cf_map.size());
            Assert.assertEquals(1, pig_prefix_cf_map.size());

            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("LoadFromHBase done");
    }

    /**
     *     * Test Load from hbase with map parameters and with a
     *     static column in different order
     *
     */
    @Test
    public void testLoadOrderWithFixedAndPrefixedCols() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + "pig:col_* pig:prefixed_col_d"
                + "','-loadKey') as (rowKey:chararray, cols:map[], prefixed_col_d:chararray);");
        pig.registerQuery("b = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + "pig:prefixed_col_d pig:col_*"
                + "','-loadKey') as (rowKey:chararray, prefixed_col_d:chararray, cols:map[]);");
        Iterator<Tuple> it = pig.openIterator("a");
        Iterator<Tuple> it2 = pig.openIterator("b");
        int count = 0;
        LOG.info("LoadFromHBase Starting");
        while (it.hasNext() && it2.hasNext()) {
            Tuple t = it.next();
            Tuple t2 = it2.next();
            LOG.info("LoadFromHBase a:" + t);
            LOG.info("LoadFromHBase b:" + t2);
            String rowKey = (String) t.get(0);
            String rowKey2 = (String) t2.get(0);
            Assert.assertEquals(rowKey, rowKey2);
            Assert.assertEquals(t.size(), t2.size());
            @SuppressWarnings("rawtypes")
            Map cols_a = (Map) t.get(1);
            @SuppressWarnings("rawtypes")
            Map cols_b = (Map) t2.get(2);
            Assert.assertEquals(cols_a.size(), cols_b.size());
            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("LoadFromHBase done");
    }

    /**
     * load from hbase test
     *
     * @throws IOException
     */
    @Test
    public void testLoadFromHBase() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
        LOG.info("QUERY: " + "a = load 'hbase://" + TESTTABLE_1 + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C +" pig:col_d"
                + "') as (col_a, col_b, col_c, col_d);");
        pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C +" pig:col_d"
                + "') as (col_a, col_b, col_c, col_d);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBase Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            String col_a = ((DataByteArray) t.get(0)).toString();
            String col_b = ((DataByteArray) t.get(1)).toString();
            String col_c = ((DataByteArray) t.get(2)).toString();
            Object col_d = t.get(3);       // empty cell
            Assert.assertEquals(count, Integer.parseInt(col_a));
            Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + count, col_c);
            Assert.assertNull(col_d);
            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("LoadFromHBase done");
    }

    /**
     * load from hbase test without hbase:// prefix
     *
     * @throws IOException
     */
    @Test
    public void testBackwardsCompatibility() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
        pig.registerQuery("a = load '" + TESTTABLE_1 + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
                + "') as (col_a, col_b, col_c);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("BackwardsCompatibility Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            String col_a = ((DataByteArray) t.get(0)).toString();
            String col_b = ((DataByteArray) t.get(1)).toString();
            String col_c = ((DataByteArray) t.get(2)).toString();

            Assert.assertEquals(count, Integer.parseInt(col_a));
            Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + count, col_c);
            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("BackwardsCompatibility done");
    }

    /**
     * load from hbase test including the row key as the first column
     *
     * @throws IOException
     */
    @Test
    public void testLoadFromHBaseWithRowKey() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
        pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
                + "','-loadKey') as (rowKey,col_a, col_b, col_c);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBaseWithRowKey Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            String rowKey = ((DataByteArray) t.get(0)).toString();
            String col_a = ((DataByteArray) t.get(1)).toString();
            String col_b = ((DataByteArray) t.get(2)).toString();
            String col_c = ((DataByteArray) t.get(3)).toString();

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals(count, Integer.parseInt(col_a));
            Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + count, col_c);

            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("LoadFromHBaseWithRowKey done");
    }

    /**
     * Test Load from hbase with parameters lte and gte (01<=key<=98)
     *
     */
    @Test
    public void testLoadWithParameters_1() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A
                + " "
                + TESTCOLUMN_B
                + " "
                + TESTCOLUMN_C
                + "','-loadKey -gte 01 -lte 98') as (rowKey,col_a, col_b, col_c);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        int next = 1;
        LOG.info("LoadFromHBaseWithParameters_1 Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = ((DataByteArray) t.get(0)).toString();
            String col_a = ((DataByteArray) t.get(1)).toString();
            String col_b = ((DataByteArray) t.get(2)).toString();
            String col_c = ((DataByteArray) t.get(3)).toString();

            Assert.assertEquals("00".substring((next + "").length()) + next,
                    rowKey);
            Assert.assertEquals(next, Integer.parseInt(col_a));
            Assert.assertEquals(next + 0.0, Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + next, col_c);

            count++;
            next++;
        }
        Assert.assertEquals(TEST_ROW_COUNT - 2, count);
        LOG.info("LoadFromHBaseWithParameters_1 done");
    }

    /**
     * Test Load from hbase with parameters lt and gt (00&lt;key&lt;99)
     */
    @Test
    public void testLoadWithParameters_2() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A
                + " "
                + TESTCOLUMN_B
                + " "
                + TESTCOLUMN_C
                + "','-loadKey -gt 00 -lt 99') as (rowKey,col_a, col_b, col_c);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        int next = 1;
        LOG.info("LoadFromHBaseWithParameters_2 Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            String rowKey = ((DataByteArray) t.get(0)).toString();
            String col_a = ((DataByteArray) t.get(1)).toString();
            String col_b = ((DataByteArray) t.get(2)).toString();
            String col_c = ((DataByteArray) t.get(3)).toString();

            Assert.assertEquals("00".substring((next + "").length()) + next,
                    rowKey);
            Assert.assertEquals(next, Integer.parseInt(col_a));
            Assert.assertEquals(next + 0.0, Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + next, col_c);

            count++;
            next++;
        }
        Assert.assertEquals(TEST_ROW_COUNT - 2, count);
        LOG.info("LoadFromHBaseWithParameters_2 Starting");
    }

    /**
     * Test Load from hbase with parameters limit
     */
    @Test
    public void testLoadWithParameters_3() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
        pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
                + "','-loadKey -limit 10') as (rowKey,col_a, col_b, col_c);");
        Iterator<Tuple> it = pig.openIterator("a");
        int count = 0;
        LOG.info("LoadFromHBaseWithParameters_3 Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            String rowKey = ((DataByteArray) t.get(0)).toString();
            String col_a = ((DataByteArray) t.get(1)).toString();
            String col_b = ((DataByteArray) t.get(2)).toString();
            String col_c = ((DataByteArray) t.get(3)).toString();

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals(count, Integer.parseInt(col_a));
            Assert.assertEquals(count + 0.0, Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + count, col_c);

            count++;
        }
        // 'limit' apply for each region and here we have only one region
        Assert.assertEquals(10, count);
        LOG.info("LoadFromHBaseWithParameters_3 Starting");
    }

    /**
     * Test Load from hbase with parameters regex [2-3][4-5]
     *
     */
    @Test
    public void testLoadWithParameters_4() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A
                + " "
                + TESTCOLUMN_B
                + " "
                + TESTCOLUMN_C
                + "','-loadKey -regex [2-3][4-5]') as (rowKey,col_a, col_b, col_c);");
        Iterator<Tuple> it = pig.openIterator("a");

        int[] expectedValues = {24, 25, 34, 35};
        int count = 0;
        int countExpected = 4;
        LOG.info("LoadFromHBaseWithParameters_4 Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = ((DataByteArray) t.get(0)).toString();
            String col_a = ((DataByteArray) t.get(1)).toString();
            String col_b = ((DataByteArray) t.get(2)).toString();
            String col_c = ((DataByteArray) t.get(3)).toString();

            Assert.assertEquals(expectedValues[count] + "", rowKey);
            Assert.assertEquals(expectedValues[count], Integer.parseInt(col_a));
            Assert.assertEquals((double) expectedValues[count], Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + expectedValues[count], col_c);

            count++;
        }
        Assert.assertEquals(countExpected, count);
        LOG.info("LoadFromHBaseWithParameters_4 done");
    }

    /**
     * Test Load from hbase with parameters lt and gt (10&lt;key&lt;30) and regex \\d[5]
     */
    @Test
    public void testLoadWithParameters_5() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);

        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A
                + " "
                + TESTCOLUMN_B
                + " "
                + TESTCOLUMN_C
                + "','-loadKey -gt 10 -lt 30 -regex \\\\d[5]') as (rowKey,col_a, col_b, col_c);");
        Iterator<Tuple> it = pig.openIterator("a");

        int[] expectedValues = {15, 25};
        int count = 0;
        int countExpected = 2;
        LOG.info("LoadFromHBaseWithParameters_5 Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            LOG.info("LoadFromHBase " + t);
            String rowKey = ((DataByteArray) t.get(0)).toString();
            String col_a = ((DataByteArray) t.get(1)).toString();
            String col_b = ((DataByteArray) t.get(2)).toString();
            String col_c = ((DataByteArray) t.get(3)).toString();

            Assert.assertEquals(expectedValues[count] + "", rowKey);
            Assert.assertEquals(expectedValues[count], Integer.parseInt(col_a));
            Assert.assertEquals((double) expectedValues[count], Double.parseDouble(col_b), 1e-6);
            Assert.assertEquals("Text_" + expectedValues[count], col_c);

            count++;
        }
        Assert.assertEquals(countExpected, count);
        LOG.info("LoadFromHBaseWithParameters_5 done");
    }

    /**
     * Test Load from hbase with projection.
     */
    @Test
    public void testLoadWithProjection_1() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        scanTable1(pig, DataFormat.HBaseBinary);
        pig.registerQuery("b = FOREACH a GENERATE col_a, col_c;");
        Iterator<Tuple> it = pig.openIterator("b");
        int index = 0;
        LOG.info("testLoadWithProjection_1 Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            int col_a = (Integer) t.get(0);
            String col_c = (String) t.get(1);
            Assert.assertEquals(index, col_a);
            Assert.assertEquals("Text_" + index, col_c);
            Assert.assertEquals(2, t.size());
            index++;
        }
        Assert.assertEquals(100, index);
        LOG.info("testLoadWithProjection_1 done");
    }

    /**
     * Test Load from hbase with projection and the default caster.
     */
    @Test
    public void testLoadWithProjection_2() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
        scanTable1(pig, DataFormat.UTF8PlainText);
        pig.registerQuery("b = FOREACH a GENERATE col_a, col_c;");
        Iterator<Tuple> it = pig.openIterator("b");
        int index = 0;
        LOG.info("testLoadWithProjection_2 Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            int col_a = (Integer) t.get(0);
            String col_c = (String) t.get(1);
            Assert.assertEquals(index, col_a);
            Assert.assertEquals("Text_" + index, col_c);
            Assert.assertEquals(2, t.size());
            index++;
        }
        Assert.assertEquals(100, index);
        LOG.info("testLoadWithProjection_2 done");
    }

    /**
     * Test merge inner join with two tables
     *
     * @throws IOException
     */
    @Test
    public void testMergeJoin() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary);
        pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
                        + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                        + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
                        + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
        pig.registerQuery("b = load 'hbase://" + TESTTABLE_2 + "' using "
                        + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                        + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
                        + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
        pig.registerQuery("c = join a by rowKey, b by rowKey USING 'merge';");
        pig.registerQuery("d = ORDER c BY a::rowKey;");

        Iterator<Tuple> it = pig.openIterator("d");
        int count = 0;
        LOG.info("MergeJoin Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            // the columns for both relations should be merged into one tuple
            // left side
            String rowKey = (String) t.get(0);
            int col_a = (Integer) t.get(1);
            double col_b = (Double) t.get(2);
            String col_c = (String) t.get(3);

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);
            Assert.assertEquals(count, col_a);
            Assert.assertEquals(count + 0.0, col_b, 1e-6);
            Assert.assertEquals("Text_" + count, col_c);

            // right side
            String rowKey2 = (String) t.get(4);
            int col_a2 = (Integer) t.get(5);
            double col_b2 = (Double) t.get(6);
            String col_c2 = (String) t.get(7);

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey2);
            Assert.assertEquals(count, col_a2);
            Assert.assertEquals(count + 0.0, col_b2, 1e-6);
            Assert.assertEquals("Text_" + count, col_c2);

            count++;
        }
        Assert.assertEquals(count, TEST_ROW_COUNT);
        LOG.info("MergeJoin done");
    }

    /**
     * Test collected group
     * not much to test here since keys are unique
     *
     * @throws IOException
     */
    @Test
    public void testCollectedGroup() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary);
        pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
                        + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                        + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C
                        + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
        pig.registerQuery("c = group a by rowKey USING 'collected';");
        pig.registerQuery("d = ORDER c BY group;");

        // do a merge group
        Iterator<Tuple> it = pig.openIterator("d");
        int count = 0;
        LOG.info("CollectedGroup Starting");
        while (it.hasNext()) {
            Tuple t = it.next();

            String rowKey = (String)t.get(0);

            Assert.assertEquals("00".substring((count + "").length()) + count,
                    rowKey);

            int rowCount = 0;
            DataBag rows = (DataBag)t.get(1);
            for (Iterator<Tuple> iter = rows.iterator(); iter.hasNext();) {
                Tuple row = iter.next();

                // there should be two bags with all 3 columns
                int col_a = (Integer) row.get(1);
                double col_b = (Double) row.get(2);
                String col_c = (String) row.get(3);

                Assert.assertEquals(count, col_a);
                Assert.assertEquals(count + 0.0, col_b, 1e-6);
                Assert.assertEquals("Text_" + count, col_c);
                rowCount++;
            }
            Assert.assertEquals(1, rowCount);

            count++;
        }
        Assert.assertEquals(TEST_ROW_COUNT, count);
        LOG.info("CollectedGroup done");
    }

    /**
     * Test Load from hbase using HBaseBinaryConverter
     */
    @Test
    public void testHBaseBinaryConverter() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);

        scanTable1(pig, DataFormat.HBaseBinary);
        Iterator<Tuple> it = pig.openIterator("a");
        int index = 0;
        LOG.info("testHBaseBinaryConverter Starting");
        while (it.hasNext()) {
            Tuple t = it.next();
            String rowKey = (String) t.get(0);
            int col_a = (Integer) t.get(1);
            double col_b = (Double) t.get(2);
            String col_c = (String) t.get(3);

            Assert.assertEquals("00".substring((index + "").length()) + index,
                    rowKey);
            Assert.assertEquals(index, col_a);
            Assert.assertEquals(index + 0.0, col_b, 1e-6);
            Assert.assertEquals("Text_" + index, col_c);
            index++;
        }
        LOG.info("testHBaseBinaryConverter done");
    }

    /**
     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
     * 'TESTTABLE_2' using HBaseBinaryFormat
     *
     * @throws IOException
     */
    @Test
    public void testStoreToHBase_1() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);

        pig.getPigContext().getProperties()
                .setProperty(MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, "true");

        scanTable1(pig, DataFormat.HBaseBinary);
        pig.store("a", "hbase://" +  TESTTABLE_2,
                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
        ResultScanner scanner = table.getScanner(new Scan());
        Iterator<Result> iter = scanner.iterator();
        int i = 0;
        for (i = 0; iter.hasNext(); ++i) {
            Result result = iter.next();
            String v = i + "";
            String rowKey = Bytes.toString(result.getRow());
            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));

            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
            Assert.assertEquals(i, col_a);
            Assert.assertEquals(i + 0.0, col_b, 1e-6);
            Assert.assertEquals("Text_" + i, col_c);
        }
        Assert.assertEquals(100, i);

        pig.getPigContext().getProperties()
                .setProperty(MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, "false");
        table.close();
    }

    /**
     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
     * 'TESTTABLE_2' using HBaseBinaryFormat projecting out column c
     *
     * @throws IOException
     */
    @Test
    public void testStoreToHBase_1_with_projection() throws IOException {
        System.getProperties().setProperty("pig.usenewlogicalplan", "false");
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
        scanTable1(pig, DataFormat.HBaseBinary);
        pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
        pig.store("b",  TESTTABLE_2,
                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B +
                "','-caster HBaseBinaryConverter')");

        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
        ResultScanner scanner = table.getScanner(new Scan());
        Iterator<Result> iter = scanner.iterator();
        int i = 0;
        for (i = 0; iter.hasNext(); ++i) {
            Result result = iter.next();
            String v = String.valueOf(i);
            String rowKey = Bytes.toString(result.getRow());
            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));

            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
            Assert.assertEquals(i, col_a);
            Assert.assertEquals(i + 0.0, col_b, 1e-6);
        }
        Assert.assertEquals(100, i);
        table.close();
    }

    /**
     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
     * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
     *
     * @throws IOException
     */
    @Test
    public void testStoreToHBase_1_with_timestamp() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
        scanTable1(pig, DataFormat.HBaseBinary);
        long timestamp = System.currentTimeMillis();
        pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l, col_a, col_b, col_c;");
        pig.store("b",  "hbase://" + TESTTABLE_2,
                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')");

        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
        ResultScanner scanner = table.getScanner(new Scan());
        Iterator<Result> iter = scanner.iterator();
        int i = 0;
        for (i = 0; iter.hasNext(); ++i) {
            Result result = iter.next();
            String v = String.valueOf(i);
            String rowKey = Bytes.toString(result.getRow());
            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));

            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);

            Assert.assertEquals(timestamp, col_a_ts);
            Assert.assertEquals(timestamp, col_b_ts);
            Assert.assertEquals(timestamp, col_c_ts);

            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
            Assert.assertEquals(i, col_a);
            Assert.assertEquals(i + 0.0, col_b, 1e-6);
            Assert.assertEquals("Text_" + i, col_c);
        }
        Assert.assertEquals(100, i);
        table.close();
    }

    /**
     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
     * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
     *
     * @throws IOException
     */
    @Test
    public void testStoreToHBase_1_with_datetime_timestamp() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
        scanTable1(pig, DataFormat.HBaseBinary);
        long timestamp = System.currentTimeMillis();
        pig.registerQuery("b = FOREACH a GENERATE rowKey, ToDate(" + timestamp + "l), col_a, col_b, col_c;");
        pig.store("b",  "hbase://" + TESTTABLE_2,
                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')");

        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
        ResultScanner scanner = table.getScanner(new Scan());
        Iterator<Result> iter = scanner.iterator();
        int i = 0;
        for (i = 0; iter.hasNext(); ++i) {
            Result result = iter.next();
            String v = String.valueOf(i);
            String rowKey = Bytes.toString(result.getRow());
            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));

            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);

            Assert.assertEquals(timestamp, col_a_ts);
            Assert.assertEquals(timestamp, col_b_ts);
            Assert.assertEquals(timestamp, col_c_ts);

            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
            Assert.assertEquals(i, col_a);
            Assert.assertEquals(i + 0.0, col_b, 1e-6);
            Assert.assertEquals("Text_" + i, col_c);
        }
        Assert.assertEquals(100, i);
        table.close();
    }

    /**
     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
     * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
     *
     * @throws IOException
     */
    @Test
    public void testStoreToHBase_1_with_bytearray_timestamp() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
        scanTable1(pig, DataFormat.HBaseBinary);
        long timestamp = System.currentTimeMillis();
        pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l as timestamp:bytearray, col_a, col_b, col_c;");
        pig.store("b",  "hbase://" + TESTTABLE_2,
                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')");

        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
        ResultScanner scanner = table.getScanner(new Scan());
        Iterator<Result> iter = scanner.iterator();
        int i = 0;
        for (i = 0; iter.hasNext(); ++i) {
            Result result = iter.next();
            String v = String.valueOf(i);
            String rowKey = Bytes.toString(result.getRow());
            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));

            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);

            Assert.assertEquals(timestamp, col_a_ts);
            Assert.assertEquals(timestamp, col_b_ts);
            Assert.assertEquals(timestamp, col_c_ts);

            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
            Assert.assertEquals(i, col_a);
            Assert.assertEquals(i + 0.0, col_b, 1e-6);
            Assert.assertEquals("Text_" + i, col_c);
        }
        Assert.assertEquals(100, i);
        table.close();
    }

    /**
     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
     * 'TESTTABLE_1' deleting odd row keys
     *
     * @throws IOException
     */
    @Test
    public void testStoreToHBase_1_with_delete() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        scanTable1(pig, DataFormat.HBaseBinary);
        pig.registerQuery("b = FOREACH a GENERATE rowKey, (boolean)(((int)rowKey) % 2), col_a, col_b, col_c;");
        pig.store("b",  "hbase://" + TESTTABLE_1,
                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTombstone true')");

        Table table = connection.getTable(TableName.valueOf(TESTTABLE_1));
        ResultScanner scanner = table.getScanner(new Scan());
        Iterator<Result> iter = scanner.iterator();
        int count = 0;
        for (int i = 0; iter.hasNext(); i = i + 2) {
            Result result = iter.next();
            String v = String.valueOf(i);
            String rowKey = Bytes.toString(result.getRow());
            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));

            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);

            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
            Assert.assertEquals(i, col_a);
            Assert.assertEquals(i + 0.0, col_b, 1e-6);
            Assert.assertEquals("Text_" + i, col_c);

            count++;
        }
        Assert.assertEquals(50, count);
        table.close();
    }

    /**
     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
     * 'TESTTABLE_2' using UTF-8 Plain Text format
     *
     * @throws IOException
     */
    @Test
    public void testStoreToHBase_2() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
        scanTable1(pig, DataFormat.HBaseBinary);
        pig.store("a", TESTTABLE_2,
                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                + TESTCOLUMN_C + "')");

        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
        ResultScanner scanner = table.getScanner(new Scan());
        Iterator<Result> iter = scanner.iterator();
        int i = 0;
        for (i = 0; iter.hasNext(); ++i) {
            Result result = iter.next();
            String v = i + "";
            String rowKey = new String(result.getRow());
            int col_a = Integer.parseInt(new String(getColValue(result, TESTCOLUMN_A)));
            double col_b = Double.parseDouble(new String(getColValue(result, TESTCOLUMN_B)));
            String col_c = new String(getColValue(result, TESTCOLUMN_C));

            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
            Assert.assertEquals(i, col_a);
            Assert.assertEquals(i + 0.0, col_b, 1e-6);
            Assert.assertEquals("Text_" + i, col_c);
        }
        Assert.assertEquals(100, i);
        table.close();
    }

    /**
     * Assert that -noWAL actually disables the WAL
     * @throws IOException
     * @throws ParseException
     */
    @Test
    public void testNoWAL() throws Exception {
        HBaseStorage hbaseStorage = new HBaseStorage(TESTCOLUMN_A, "-noWAL");

        Object key = "somekey";
        byte type = DataType.CHARARRAY;
        Put put = hbaseStorage.createPut(key, type);
        Delete delete = hbaseStorage.createDelete(key, type, System.currentTimeMillis());
        boolean hasDurabilityMethod = false;
        try {
            put.getClass().getMethod("getDurability");
            hasDurabilityMethod = true;
        } catch (NoSuchMethodException e) {
        }
        if (hasDurabilityMethod) { // Hbase version 0.95+
            Object skipWal = Class.forName("org.apache.hadoop.hbase.client.Durability").getField("SKIP_WAL").get(put);
            Assert.assertEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal);
            Assert.assertEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal);
        } else {
            Assert.assertEquals(Durability.SKIP_WAL, put.getDurability());
            Assert.assertEquals(Durability.SKIP_WAL, delete.getDurability());
        }
    }

    /**
     * Assert that without -noWAL, the WAL is enabled the WAL
     * @throws IOException
     * @throws ParseException
     */
    @Test
    public void testWIthWAL() throws Exception {
        HBaseStorage hbaseStorage = new HBaseStorage(TESTCOLUMN_A);

        Object key = "somekey";
        byte type = DataType.CHARARRAY;
        Put put = hbaseStorage.createPut(key, type);
        Delete delete = hbaseStorage.createDelete(key, type, System.currentTimeMillis());
        boolean hasDurabilityMethod = false;
        try {
            put.getClass().getMethod("getDurability");
            hasDurabilityMethod = true;
        } catch (NoSuchMethodException e) {
        }
        if (hasDurabilityMethod) { // Hbase version 0.95+
            Object skipWal = Class.forName("org.apache.hadoop.hbase.client.Durability").getField("SKIP_WAL").get(put);
            Assert.assertNotEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal);
            Assert.assertNotEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal);
        } else {
            Assert.assertEquals(Durability.SKIP_WAL, put.getDurability());
            Assert.assertEquals(Durability.SKIP_WAL, delete.getDurability());
        }
    }

    /**
     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
     * 'TESTTABLE_2' using UTF-8 Plain Text format projecting column c
     *
     * @throws IOException
     */
    @Test
    public void testStoreToHBase_2_with_projection() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, false, DataFormat.UTF8PlainText);
        scanTable1(pig, DataFormat.HBaseBinary);
        pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
        pig.store("b", TESTTABLE_2,
                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");

        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
        ResultScanner scanner = table.getScanner(new Scan());
        Iterator<Result> iter = scanner.iterator();
        int i = 0;
        for (i = 0; iter.hasNext(); ++i) {
            Result result = iter.next();
            String v = i + "";
            String rowKey = new String(result.getRow());
            int col_a = Integer.parseInt(new String(getColValue(result, TESTCOLUMN_A)));
            double col_b = Double.parseDouble(new String(getColValue(result, TESTCOLUMN_B)));

            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
            Assert.assertEquals(i, col_a);
            Assert.assertEquals(i + 0.0, col_b, 1e-6);
        }
        Assert.assertEquals(100, i);
        table.close();
    }

    /**
     * load from hbase 'TESTTABLE_1' using UTF-8 Plain Text format, and store it
     * into 'TESTTABLE_2' using UTF-8 Plain Text format projecting column c
     *
     * @throws IOException
     */
    @Test
    public void testStoreToHBase_3_with_projection_no_caster() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
        prepareTable(TESTTABLE_2, false, DataFormat.UTF8PlainText);
        scanTable1(pig, DataFormat.UTF8PlainText);
        pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
        pig.store("b", TESTTABLE_2,
                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");

        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
        ResultScanner scanner = table.getScanner(new Scan());
        Iterator<Result> iter = scanner.iterator();
        int i = 0;
        for (i = 0; iter.hasNext(); ++i) {
            Result result = iter.next();
            String v = i + "";
            String rowKey = new String(result.getRow());

            String col_a = new String(getColValue(result, TESTCOLUMN_A));
            String col_b = new String(getColValue(result, TESTCOLUMN_B));

            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
            Assert.assertEquals(i + "", col_a);
            Assert.assertEquals(i + 0.0 + "", col_b);
        }
        Assert.assertEquals(100, i);
        table.close();
    }

    /**
     * Test to if HBaseStorage handles different scans in a single MR job.
     * This can happen PIG loads two different aliases (as in a join or
     * union).
     */
    @Test
    public void testHeterogeneousScans() throws IOException {
        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
        prepareTable(TESTTABLE_2, true, DataFormat.UTF8PlainText);
        scanTable1(pig, DataFormat.HBaseBinary);
        pig.registerQuery(String.format(
              " b = load 'hbase://%s' using %s('%s %s') as (col_a:int, col_c);",
              TESTTABLE_2, "org.apache.pig.backend.hadoop.hbase.HBaseStorage",
              TESTCOLUMN_A, TESTCOLUMN_C));
        pig.registerQuery(" c = join a by col_a, b by col_a; ");
        // this results in a single job with mappers loading
        // different HBaseStorage specs.

        Iterator<Tuple> it = pig.openIterator("c");
        int index = 0;
        List<Tuple> expected = new ArrayList<Tuple>();
        while (index<TEST_ROW_COUNT) {
            Tuple t = TupleFactory.getInstance().newTuple();
            t.append("00".substring((index + "").length()) + index);
            t.append(index);
            t.append(index + 0.0);
            t.append("Text_" + index);
            t.append(index);
            t.append(new DataByteArray("Text_" + index));
            index++;
            expected.add(t);
        }
        Util.checkQueryOutputsAfterSort(it, expected);
}

    @Test
    // See PIG-4151
    public void testStoreEmptyMap() throws IOException {
        String tableName = "emptyMapTest";
        Table table;
        try {
            deleteAllRows(tableName);
        } catch (Exception e) {
            // It's ok, table might not exist.
        }
        byte[][] cfs = new byte[2][];
        cfs[0] = Bytes.toBytes("info");
        cfs[1] = Bytes.toBytes("friends");
        try {
            table = util.createTable(TableName.valueOf(tableName),
                    cfs);
        } catch (Exception e) {
            table = connection.getTable(TableName.valueOf(tableName));
        }

        File inputFile = Util.createInputFile("test", "tmp", new String[] {"row1;Homer;Morrison;[1#Silvia,2#Stacy]",
                "row2;Sheila;Fletcher;[1#Becky,2#Salvador,3#Lois]",
                "row4;Andre;Morton;[1#Nancy]",
                "row3;Sonja;Webb;[]"
        });
        pig.registerQuery("source = LOAD '" + Util.generateURI(inputFile.toString(), pig.getPigContext())
                + "' USING PigStorage(';')"
                + " AS (row:chararray, first_name:chararray, last_name:chararray, friends:map[]);");
        pig.registerQuery("STORE source INTO 'hbase://" + tableName + "' USING" +
                " org.apache.pig.backend.hadoop.hbase.HBaseStorage('info:fname info:lname friends:*');");
        Get get = new Get(Bytes.toBytes("row3"));
        Result r = table.get(get);
        Assert.assertEquals(new String(r.getValue(cfs[0], Bytes.toBytes("fname"))), "Sonja");
        Assert.assertEquals(new String(r.getValue(cfs[0], Bytes.toBytes("lname"))), "Webb");
        Assert.assertTrue(r.getFamilyMap(cfs[1]).isEmpty());
    }

    private void scanTable1(PigServer pig, DataFormat dataFormat) throws IOException {
        scanTable1(pig, dataFormat, "");
    }

    private void scanTable1(PigServer pig, DataFormat dataFormat, String extraParams) throws IOException {
        pig.registerQuery("a = load 'hbase://"
                + TESTTABLE_1
                + "' using "
                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                + TESTCOLUMN_A
                + " "
                + TESTCOLUMN_B
                + " "
                + TESTCOLUMN_C
                + "','-loadKey "
                + (dataFormat == DataFormat.HBaseBinary ? " -caster HBaseBinaryConverter" : "")
                + " " + extraParams + " "
                + "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
    }

    private Table prepareTable(String tableName, boolean initData,
            DataFormat format) throws IOException {
        return prepareTable(tableName, initData, format, TableType.ONE_CF);
    }
    /**
     * Prepare a table in hbase for testing.
     *
     */
    private Table prepareTable(String tableName, boolean initData,
            DataFormat format, TableType type) throws IOException {
        // define the table schema
        Table table = null;
        try {
            if (lastTableType == type) {
                deleteAllRows(tableName);
            } else {
                util.deleteTable(TableName.valueOf(tableName));
            }
        } catch (Exception e) {
            // It's ok, table might not exist.
        }
        try {
            if (type == TableType.TWO_CF) {
                table = util.createTable(TableName.valueOf(tableName),
                        new byte[][]{COLUMNFAMILY, COLUMNFAMILY2});
            } else {
                table = util.createTable(TableName.valueOf(tableName),
                        COLUMNFAMILY);
            }
            lastTableType = type;
        } catch (Exception e) {
            table = connection.getTable(TableName.valueOf(tableName));
        }

        if (initData) {
            for (int i = 0; i < TEST_ROW_COUNT; i++) {
                String v = i + "";
                if (format == DataFormat.HBaseBinary) {
                    // row key: string type
                    Put put = new Put(Bytes.toBytes("00".substring(v.length())
                            + v));
                    // sc: int type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("sc"),
                            Bytes.toBytes(i));
                    // col_a: int type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_a"),
                            Bytes.toBytes(i));
                    // col_b: double type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_b"),
                            Bytes.toBytes(i + 0.0));
                    // col_c: string type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_c"),
                            Bytes.toBytes("Text_" + i));
                    // prefixed_col_d: string type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
                            Bytes.toBytes("PrefixedText_" + i));
                    // another cf
                    if (type == TableType.TWO_CF) {
                        put.addColumn(COLUMNFAMILY2, Bytes.toBytes("col_x"),
                                Bytes.toBytes(i));
                    }
                    table.put(put);
                } else {
                    // row key: string type
                    Put put = new Put(
                            ("00".substring(v.length()) + v).getBytes());
                    // sc: int type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("sc"),
                            (i + "").getBytes()); // int
                    // col_a: int type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_a"),
                            (i + "").getBytes()); // int
                    // col_b: double type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_b"),
                            ((i + 0.0) + "").getBytes());
                    // col_c: string type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_c"),
                            ("Text_" + i).getBytes());
                    // prefixed_col_d: string type
                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
                            ("PrefixedText_" + i).getBytes());
                    // another cf
                    if (type == TableType.TWO_CF) {
                        put.addColumn(COLUMNFAMILY2, Bytes.toBytes("col_x"),
                                (i + "").getBytes());
                    }
                    table.put(put);
                }
            }
            BufferedMutator bm = connection.getBufferedMutator(table.getName());
            bm.flush();
        }
        return table;
    }

    /**
     * Helper to deal with fetching a result based on a cf:colname string spec
     * @param result
     * @param colName
     * @return
     */
    private static byte[] getColValue(Result result, String colName) {
        byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
        return result.getValue(colArray[0], colArray[1]);

    }

    /**
     * Helper to deal with fetching a timestamp based on a cf:colname string spec
     * @param result
     * @param colName
     * @return
     */
    private static long getColTimestamp(Result result, String colName) {
        byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
        return result.getColumnLatestCell(colArray[0], colArray[1]).getTimestamp();
    }

}
