PIG-5191: Pig HBase 2.0.0 support (nkollar via szita, reviewed by rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1839568 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 07d0106..c56859a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-5191: Pig HBase 2.0.0 support (nkollar via szita, reviewed by rohini)
+
 PIG-5344: Update Apache HTTPD LogParser to latest version (nielsbasjes via szita)
 
 PIG-4092: Predicate pushdown for Parquet (nkollar via rohini)
diff --git a/build.xml b/build.xml
index 8bcbe5e..34697ee 100644
--- a/build.xml
+++ b/build.xml
@@ -1713,7 +1713,7 @@
        <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
                  pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/>
        <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
-                 pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion}"/>
+                 pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion},hbase${hbaseversion}"/>
        <ivy:cachepath pathid="compile.classpath" conf="compile"/>
      </target>
 
diff --git a/ivy.xml b/ivy.xml
index b5ad72d..0902b18 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -40,6 +40,7 @@
     <conf name="buildJar" extends="compile,test" visibility="private"/>
     <conf name="hadoop2" visibility="private"/>
     <conf name="hbase1" visibility="private"/>
+    <conf name="hbase2" visibility="private"/>
     <conf name="spark1" visibility="private" />
     <conf name="spark2" visibility="private" />
   </configurations>
@@ -308,6 +309,167 @@
       <exclude org="com.sun.jersey" module="jersey-json"/>
       <exclude org="asm" module="asm"/>
     </dependency>
+    <dependency org="com.lmax" name="disruptor" rev="3.3.0" conf="hbase1->master"/>
+
+    <!-- HBase 2.x dependencies -->
+    <dependency org="org.apache.hbase" name="hbase-client" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-client" type="jar"/>
+      <artifact name="hbase-client" type="test-jar" ext="jar" m:classifier="tests"/>
+      <exclude org="org.slf4j" module="slf4j-api"/>
+      <exclude org="org.slf4j" module="slf4j-log4j12" />
+      <exclude org="stax" module="stax-api" />
+      <exclude org="javax.xml.bind" module="jaxb-api" />
+      <exclude org="tomcat" module="jasper-runtime"/>
+      <exclude org="tomcat" module="jasper-compiler"/>
+      <exclude org="com.google.protobuf" module="protobuf-java"/>
+      <exclude org="com.sun.jersey" module="jersey-core"/>
+      <exclude org="com.sun.jersey" module="jersey-server"/>
+      <exclude org="com.sun.jersey" module="jersey-json"/>
+      <exclude org="asm" module="asm"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-common" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-common" type="jar"/>
+      <artifact name="hbase-common" type="test-jar" ext="jar" m:classifier="tests"/>
+      <exclude org="org.apache.hadoop" module="hadoop-core"/>
+      <exclude org="stax" module="stax-api" />
+      <exclude org="javax.xml.bind" module="jaxb-api" />
+      <exclude org="javax.ws.rs" module="jsr311-api" />
+      <exclude org="tomcat" module="jasper-runtime"/>
+      <exclude org="tomcat" module="jasper-compiler"/>
+      <exclude org="com.sun.jersey" module="jersey-core"/>
+      <exclude org="com.sun.jersey" module="jersey-server"/>
+      <exclude org="com.sun.jersey" module="jersey-json"/>
+      <exclude org="asm" module="asm"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-server" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-server" type="jar"/>
+      <artifact name="hbase-server" type="test-jar" ext="jar" m:classifier="tests"/>
+      <exclude org="org.apache.hadoop" module="hadoop-core"/>
+      <exclude org="org.slf4j" module="slf4j-api"/>
+      <exclude org="org.slf4j" module="slf4j-log4j12" />
+      <exclude org="stax" module="stax-api" />
+      <exclude org="javax.xml.bind" module="jaxb-api" />
+      <exclude org="javax.ws.rs" module="jsr311-api" />
+      <exclude org="tomcat" module="jasper-runtime"/>
+      <exclude org="tomcat" module="jasper-compiler"/>
+      <exclude org="com.sun.jersey" module="jersey-core"/>
+      <exclude org="com.sun.jersey" module="jersey-server"/>
+      <exclude org="com.sun.jersey" module="jersey-json"/>
+      <exclude org="asm" module="asm"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-protocol" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-protocol" type="jar"/>
+      <artifact name="hbase-protocol" type="test-jar" ext="jar" m:classifier="tests"/>
+      <exclude org="com.google.protobuf" module="protobuf-java"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-hadoop-compat" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-hadoop-compat" type="jar"/>
+      <artifact name="hbase-hadoop-compat" type="test-jar" ext="jar" m:classifier="tests"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-hadoop2-compat" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-hadoop2-compat" type="jar"/>
+      <artifact name="hbase-hadoop2-compat" type="test-jar" ext="jar" m:classifier="tests"/>
+      <exclude org="org.apache.hadoop" module="hadoop-core"/>
+      <exclude org="org.slf4j" module="slf4j-api"/>
+      <exclude org="stax" module="stax-api" />
+      <exclude org="javax.xml.bind" module="jaxb-api" />
+      <exclude org="tomcat" module="jasper-runtime"/>
+      <exclude org="tomcat" module="jasper-compiler"/>
+      <exclude org="com.sun.jersey" module="jersey-core"/>
+      <exclude org="com.sun.jersey" module="jersey-server"/>
+      <exclude org="com.sun.jersey" module="jersey-json"/>
+      <exclude org="asm" module="asm"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-protocol-shaded" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-protocol-shaded" type="jar"/>
+      <artifact name="hbase-protocol-shaded" type="test-jar" ext="jar" m:classifier="tests"/>
+      <exclude org="com.google.protobuf" module="protobuf-java"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-procedure" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-procedure" type="jar"/>
+      <artifact name="hbase-procedure" type="test-jar" ext="jar" m:classifier="tests"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-metrics-api" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-metrics-api" type="jar"/>
+      <artifact name="hbase-metrics-api" type="test-jar" ext="jar" m:classifier="tests"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-metrics" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-metrics" type="jar"/>
+      <artifact name="hbase-metrics" type="test-jar" ext="jar" m:classifier="tests"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-mapreduce" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-mapreduce" type="jar"/>
+      <artifact name="hbase-mapreduce" type="test-jar" ext="jar" m:classifier="tests"/>
+      <exclude org="org.apache.hadoop" module="hadoop-core"/>
+      <exclude org="org.slf4j" module="slf4j-api"/>
+      <exclude org="org.slf4j" module="slf4j-log4j12" />
+      <exclude org="stax" module="stax-api" />
+      <exclude org="javax.xml.bind" module="jaxb-api" />
+      <exclude org="javax.ws.rs" module="jsr311-api" />
+      <exclude org="tomcat" module="jasper-runtime"/>
+      <exclude org="tomcat" module="jasper-compiler"/>
+      <exclude org="com.sun.jersey" module="jersey-core"/>
+      <exclude org="com.sun.jersey" module="jersey-server"/>
+      <exclude org="com.sun.jersey" module="jersey-json"/>
+      <exclude org="asm" module="asm"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-zookeeper" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-zookeeper" type="jar"/>
+      <artifact name="hbase-zookeeper" type="test-jar" ext="jar" m:classifier="tests"/>
+      <exclude org="org.apache.hadoop" module="hadoop-core"/>
+      <exclude org="stax" module="stax-api" />
+      <exclude org="javax.xml.bind" module="jaxb-api" />
+      <exclude org="javax.ws.rs" module="jsr311-api" />
+      <exclude org="tomcat" module="jasper-runtime"/>
+      <exclude org="tomcat" module="jasper-compiler"/>
+      <exclude org="com.sun.jersey" module="jersey-core"/>
+      <exclude org="com.sun.jersey" module="jersey-server"/>
+      <exclude org="com.sun.jersey" module="jersey-json"/>
+      <exclude org="asm" module="asm"/>
+    </dependency>
+
+    <dependency org="org.apache.htrace" name="htrace-core4" rev="${htrace4.version}" conf="hbase2->master">
+      <artifact name="htrace-core4" type="jar"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-replication" rev="${hbase2.version}" conf="hbase2->master">
+        <artifact name="hbase-replication" type="jar"/>
+        <artifact name="hbase-replication" type="test-jar" ext="jar" m:classifier="tests"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase" name="hbase-http" rev="${hbase2.version}" conf="hbase2->master">
+      <artifact name="hbase-http" type="jar"/>
+      <artifact name="hbase-http" type="test-jar" ext="jar" m:classifier="tests"/>
+    </dependency>
+
+    <dependency org="org.apache.hbase.thirdparty" name="hbase-shaded-miscellaneous" rev="2.1.0" conf="hbase2->master" />
+
+    <dependency org="org.apache.hbase.thirdparty" name="hbase-shaded-netty" rev="2.1.0" conf="hbase2->master" />
+
+    <dependency org="org.apache.hbase.thirdparty" name="hbase-shaded-protobuf" rev="2.1.0" conf="hbase2->master" />
+
+    <dependency org="org.eclipse.jetty" name="jetty-http" rev="9.3.20.v20170531" conf="hbase2->master"/>
+    <dependency org="org.eclipse.jetty" name="jetty-io" rev="9.3.20.v20170531" conf="hbase2->master"/>
+    <dependency org="org.eclipse.jetty" name="jetty-security" rev="9.3.20.v20170531" conf="hbase2->master"/>
+    <dependency org="org.eclipse.jetty" name="jetty-server" rev="9.3.20.v20170531" conf="hbase2->master"/>
+    <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="9.3.20.v20170531" conf="hbase2->master"/>
+    <dependency org="org.eclipse.jetty" name="jetty-util" rev="9.3.20.v20170531" conf="hbase2->master"/>
+    <dependency org="org.eclipse.jetty" name="jetty-util-ajax" rev="9.3.20.v20170531" conf="hbase2->master"/>
+    <dependency org="org.eclipse.jetty" name="jetty-webapp" rev="9.3.20.v20170531" conf="hbase2->master"/>
+    <dependency org="org.eclipse.jetty" name="jetty-xml" rev="9.3.20.v20170531" conf="hbase2->master"/>
+    <dependency org="com.lmax" name="disruptor" rev="3.3.6" conf="hbase2->master"/>
+    <!-- End of HBase dependencies -->
 
     <dependency org="org.htrace" name="htrace-core" rev="3.0.4" conf="hadoop2->master"/>
     <dependency org="org.apache.htrace" name="htrace-core" rev="${htrace.version}" conf="hadoop2->master"/>
@@ -316,7 +478,6 @@
     <dependency org="org.cloudera.htrace" name="htrace-core" rev="2.00" conf="hbase1->master">
       <artifact name="htrace-core" type="jar"/>
     </dependency>
-    <dependency org="com.lmax" name="disruptor" rev="3.3.0" conf="hbase1->master"/>
 
     <!-- for TestHBaseStorage -->
     <dependency org="org.apache.hbase" name="hbase-procedure" rev="${hbase1.version}" conf="test->master"/>
@@ -420,6 +581,7 @@
 
     <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->master"/>
     <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="spark1->default;spark2->default"/>
+    <dependency org="javax.servlet" name="javax.servlet-api" rev="3.1.0" conf="test->master"/>
     <dependency org="org.scala-lang.modules" name="scala-xml_2.11" rev="${scala-xml.version}" conf="spark1->default;spark2->default"/>
 
     <!-- for Tez integration -->
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 8624a69..ec71472 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -39,6 +39,7 @@
 hadoop-hdfs.version=2.7.3
 hadoop-mapreduce.version=2.7.3
 hbase1.version=1.2.4
+hbase2.version=2.0.0
 hsqldb.version=2.4.0
 hive.version=1.2.1
 httpcomponents.version=4.4
@@ -64,7 +65,7 @@
 stringtemplate.version=4.0.4
 log4j.version=1.2.16
 netty.version=3.6.6.Final
-netty-all.version=4.0.23.Final
+netty-all.version=4.1.1.Final
 rats-lib.version=0.5.1
 slf4j-api.version=1.6.1
 slf4j-log4j12.version=1.6.1
@@ -92,6 +93,7 @@
 leveldbjni.version=1.8
 curator.version=2.6.0
 htrace.version=3.1.0-incubating
+htrace4.version=4.0.1-incubating
 commons-lang3.version=3.6
 scala-xml.version=1.0.5
 glassfish.el.version=3.0.1-b08
\ No newline at end of file
diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
index 7a63b8d..9804038 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
@@ -45,7 +45,10 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -64,6 +67,7 @@
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableComparable;
@@ -868,16 +872,22 @@
     private void addHBaseDelegationToken(Configuration hbaseConf, Job job) {
 
         if (!UDFContext.getUDFContext().isFrontend()) {
+            LOG.debug("skipping authentication checks because we're currently in a frontend UDF context");
             return;
         }
 
         if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
+            LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token.");
             try {
-                UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-                if (currentUser.hasKerberosCredentials()) {
-                    TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job);
+                User currentUser = User.getCurrent();
+                UserGroupInformation currentUserGroupInformation = currentUser.getUGI();
+                if (currentUserGroupInformation.hasKerberosCredentials()) {
+                    try (Connection connection = ConnectionFactory.createConnection(hbaseConf, currentUser)) {
+                        TokenUtil.obtainTokenForJob(connection, currentUser, job);
+                        LOG.info("Token retrieval succeeded for user " + currentUser.getName());
+                    }
                 } else {
-                    LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available");
+                    LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName());
                 }
             } catch (RuntimeException re) {
                 throw re;
@@ -885,6 +895,8 @@
                 throw new UndeclaredThrowableException(e,
                         "Unexpected error calling TokenUtil.obtainTokenForJob()");
             }
+        } else {
+            LOG.info("hbase is not configured to use kerberos, skipping delegation token");
         }
     }
 
@@ -996,7 +1008,7 @@
             }
 
             if (!columnInfo.isColumnMap()) {
-                put.add(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
+                put.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
                         ts, objToBytes(t.get(i), (fieldSchemas == null) ?
                         DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
             } else {
@@ -1009,7 +1021,7 @@
                         }
                         // TODO deal with the fact that maps can have types now. Currently we detect types at
                         // runtime in the case of storing to a cf, which is suboptimal.
-                        put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
+                        put.addColumn(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
                                 objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
                     }
                 }
@@ -1039,7 +1051,7 @@
         delete.setTimestamp(timestamp);
 
         if(noWAL_) {
-            delete.setWriteToWAL(false);
+            delete.setDurability(Durability.SKIP_WAL);
         }
 
         return delete;
@@ -1058,7 +1070,7 @@
         Put put = new Put(objToBytes(key, type));
 
         if(noWAL_) {
-            put.setWriteToWAL(false);
+            put.setDurability(Durability.SKIP_WAL);
         }
 
         return put;
diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
index 662ea19..afbd823 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
@@ -101,7 +101,7 @@
         return splits;
     }
 
-    private boolean skipRegion(CompareOp op, byte[] key, byte[] option ) {
+    private boolean skipRegion(CompareOp op, byte[] key, byte[] option ) throws IOException {
 
         if (key.length == 0 || option == null) 
             return false;
diff --git a/test/org/apache/pig/test/TestHBaseStorage.java b/test/org/apache/pig/test/TestHBaseStorage.java
index b76774f..8ac45b1 100644
--- a/test/org/apache/pig/test/TestHBaseStorage.java
+++ b/test/org/apache/pig/test/TestHBaseStorage.java
@@ -18,6 +18,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -29,14 +30,20 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -56,7 +63,10 @@
 
 import com.google.common.collect.Lists;
 
+import static org.junit.Assert.assertTrue;
+
 public class TestHBaseStorage {
+    private static Connection connection;
 
     private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class);
     private static HBaseTestingUtility util;
@@ -84,17 +94,25 @@
     @BeforeClass
     public static void setUp() throws Exception {
         // This is needed by Pig
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, Paths.get(Util.getTestDirectory(TestLoad.class)).toAbsolutePath().toString());
+
         conf = HBaseConfiguration.create(new Configuration());
+        // Setting this property is required due to a bug in HBase 2.0
+        // will be fixed in 2.0.1, see HBASE-20544. It doesn't have any effect on HBase 1.x
+        conf.set("hbase.localcluster.assign.random.ports", "true");
 
         util = new HBaseTestingUtility(conf);
         util.startMiniZKCluster();
         util.startMiniHBaseCluster(1, 1);
+        connection = ConnectionFactory.createConnection(conf);
     }
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         // In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster()
         // here instead.
+        connection.close();
         MiniHBaseCluster hbc = util.getHBaseCluster();
         if (hbc != null) {
             hbc.shutdown();
@@ -113,17 +131,17 @@
     public void tearDown() throws Exception {
         try {
             deleteAllRows(TESTTABLE_1);
-        } catch (IOException e) {}
+        } catch (Exception e) {}
         try {
             deleteAllRows(TESTTABLE_2);
-        } catch (IOException e) {}
+        } catch (Exception e) {}
         pig.shutdown();
     }
 
     // DVR: I've found that it is faster to delete all rows in small tables
     // than to drop them.
     private void deleteAllRows(String tableName) throws Exception {
-        HTable table = new HTable(conf, tableName);
+        Table table = connection.getTable(TableName.valueOf(tableName));
         ResultScanner scanner = table.getScanner(new Scan());
         List<Delete> deletes = Lists.newArrayList();
         for (Result row : scanner) {
@@ -194,7 +212,7 @@
     public void testLoadWithSpecifiedTimestampAndRanges() throws IOException {
         long beforeTimeStamp = System.currentTimeMillis() - 10;
 
-        HTable table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+        Table table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
 
         long afterTimeStamp = System.currentTimeMillis() + 10;
 
@@ -216,9 +234,9 @@
 
         Assert.assertEquals("Timestamp is set after rows added", 0, queryWithTimestamp(null, null, afterTimeStamp));
 
-        long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatest(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp();
+        long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatestCell(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp();
 
-        Assert.assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0);
+        assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0);
 
 
         LOG.info("LoadFromHBase done");
@@ -312,7 +330,7 @@
     }
 
     /**
-     * Test Load from hbase with map parameters and multiple column prefixs
+     * Test Load from hbase with map parameters and multiple column prefixes
      *
      */
     @Test
@@ -1015,7 +1033,7 @@
                 "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                 + TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
-        HTable table = new HTable(conf, TESTTABLE_2);
+        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -1057,7 +1075,7 @@
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B +
                 "','-caster HBaseBinaryConverter')");
 
-        HTable table = new HTable(conf, TESTTABLE_2);
+        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -1094,7 +1112,7 @@
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                 + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')");
 
-        HTable table = new HTable(conf, TESTTABLE_2);
+        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -1141,7 +1159,7 @@
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                 + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')");
 
-        HTable table = new HTable(conf, TESTTABLE_2);
+        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -1188,7 +1206,7 @@
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                 + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')");
 
-        HTable table = new HTable(conf, TESTTABLE_2);
+        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -1233,7 +1251,7 @@
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                 + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTombstone true')");
 
-        HTable table = new HTable(conf, TESTTABLE_1);
+        Table table = connection.getTable(TableName.valueOf(TESTTABLE_1));
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int count = 0;
@@ -1276,7 +1294,7 @@
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
                 + TESTCOLUMN_C + "')");
 
-        HTable table = new HTable(conf, TESTTABLE_2);
+        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -1321,8 +1339,8 @@
             Assert.assertEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal);
             Assert.assertEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal);
         } else {
-            Assert.assertFalse(put.getWriteToWAL());
-            Assert.assertFalse(delete.getWriteToWAL());
+            Assert.assertEquals(Durability.SKIP_WAL, put.getDurability());
+            Assert.assertEquals(Durability.SKIP_WAL, delete.getDurability());
         }
     }
 
@@ -1350,8 +1368,8 @@
             Assert.assertNotEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal);
             Assert.assertNotEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal);
         } else {
-            Assert.assertTrue(put.getWriteToWAL());
-            Assert.assertTrue(delete.getWriteToWAL());
+            Assert.assertEquals(Durability.SKIP_WAL, put.getDurability());
+            Assert.assertEquals(Durability.SKIP_WAL, delete.getDurability());
         }
     }
 
@@ -1371,7 +1389,7 @@
                 "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
 
-        HTable table = new HTable(conf, TESTTABLE_2);
+        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -1406,7 +1424,7 @@
                 "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
                 + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
 
-        HTable table = new HTable(conf, TESTTABLE_2);
+        Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
         ResultScanner scanner = table.getScanner(new Scan());
         Iterator<Result> iter = scanner.iterator();
         int i = 0;
@@ -1465,7 +1483,7 @@
     // See PIG-4151
     public void testStoreEmptyMap() throws IOException {
         String tableName = "emptyMapTest";
-        HTable table;
+        Table table;
         try {
             deleteAllRows(tableName);
         } catch (Exception e) {
@@ -1475,10 +1493,10 @@
         cfs[0] = Bytes.toBytes("info");
         cfs[1] = Bytes.toBytes("friends");
         try {
-            table = util.createTable(Bytes.toBytesBinary(tableName),
+            table = util.createTable(TableName.valueOf(tableName),
                     cfs);
         } catch (Exception e) {
-            table = new HTable(conf, Bytes.toBytesBinary(tableName));
+            table = connection.getTable(TableName.valueOf(tableName));
         }
 
         File inputFile = Util.createInputFile("test", "tmp", new String[] {"row1;Homer;Morrison;[1#Silvia,2#Stacy]",
@@ -1518,7 +1536,7 @@
                 + "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
     }
 
-    private HTable prepareTable(String tableName, boolean initData,
+    private Table prepareTable(String tableName, boolean initData,
             DataFormat format) throws IOException {
         return prepareTable(tableName, initData, format, TableType.ONE_CF);
     }
@@ -1526,30 +1544,30 @@
      * Prepare a table in hbase for testing.
      *
      */
-    private HTable prepareTable(String tableName, boolean initData,
+    private Table prepareTable(String tableName, boolean initData,
             DataFormat format, TableType type) throws IOException {
         // define the table schema
-        HTable table = null;
+        Table table = null;
         try {
             if (lastTableType == type) {
                 deleteAllRows(tableName);
             } else {
-                util.deleteTable(tableName);
+                util.deleteTable(TableName.valueOf(tableName));
             }
         } catch (Exception e) {
             // It's ok, table might not exist.
         }
         try {
             if (type == TableType.TWO_CF) {
-                table = util.createTable(Bytes.toBytesBinary(tableName),
+                table = util.createTable(TableName.valueOf(tableName),
                         new byte[][]{COLUMNFAMILY, COLUMNFAMILY2});
             } else {
-                table = util.createTable(Bytes.toBytesBinary(tableName),
+                table = util.createTable(TableName.valueOf(tableName),
                         COLUMNFAMILY);
             }
             lastTableType = type;
         } catch (Exception e) {
-            table = new HTable(conf, Bytes.toBytesBinary(tableName));
+            table = connection.getTable(TableName.valueOf(tableName));
         }
 
         if (initData) {
@@ -1560,23 +1578,23 @@
                     Put put = new Put(Bytes.toBytes("00".substring(v.length())
                             + v));
                     // sc: int type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("sc"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("sc"),
                             Bytes.toBytes(i));
                     // col_a: int type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_a"),
                             Bytes.toBytes(i));
                     // col_b: double type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("col_b"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_b"),
                             Bytes.toBytes(i + 0.0));
                     // col_c: string type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_c"),
                             Bytes.toBytes("Text_" + i));
                     // prefixed_col_d: string type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
                             Bytes.toBytes("PrefixedText_" + i));
                     // another cf
                     if (type == TableType.TWO_CF) {
-                        put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
+                        put.addColumn(COLUMNFAMILY2, Bytes.toBytes("col_x"),
                                 Bytes.toBytes(i));
                     }
                     table.put(put);
@@ -1585,29 +1603,30 @@
                     Put put = new Put(
                             ("00".substring(v.length()) + v).getBytes());
                     // sc: int type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("sc"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("sc"),
                             (i + "").getBytes()); // int
                     // col_a: int type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_a"),
                             (i + "").getBytes()); // int
                     // col_b: double type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("col_b"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_b"),
                             ((i + 0.0) + "").getBytes());
                     // col_c: string type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_c"),
                             ("Text_" + i).getBytes());
                     // prefixed_col_d: string type
-                    put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
+                    put.addColumn(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
                             ("PrefixedText_" + i).getBytes());
                     // another cf
                     if (type == TableType.TWO_CF) {
-                        put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
+                        put.addColumn(COLUMNFAMILY2, Bytes.toBytes("col_x"),
                                 (i + "").getBytes());
                     }
                     table.put(put);
                 }
             }
-            table.flushCommits();
+            BufferedMutator bm = connection.getBufferedMutator(table.getName());
+            bm.flush();
         }
         return table;
     }
@@ -1632,7 +1651,7 @@
      */
     private static long getColTimestamp(Result result, String colName) {
         byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
-        return result.getColumnLatest(colArray[0], colArray[1]).getTimestamp();
+        return result.getColumnLatestCell(colArray[0], colArray[1]).getTimestamp();
     }
 
 }
diff --git a/test/org/apache/pig/test/TestJobSubmission.java b/test/org/apache/pig/test/TestJobSubmission.java
index 53aeaf9..1b754f2 100644
--- a/test/org/apache/pig/test/TestJobSubmission.java
+++ b/test/org/apache/pig/test/TestJobSubmission.java
@@ -26,6 +26,7 @@
 import java.util.Iterator;
 import java.util.Random;
 
+import org.apache.hadoop.hbase.TableName;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -192,6 +193,9 @@
                 Util.isMapredExecType(cluster.getExecType()));
         // use the estimation
         Configuration conf = HBaseConfiguration.create(new Configuration());
+        // Setting this property is required due to a bug in HBase 2.0
+        // will be fixed in 2.0.1, see HBASE-20544. It doesn't have any effect on HBase 1.x
+        conf.set("hbase.localcluster.assign.random.ports", "true");
         HBaseTestingUtility util = new HBaseTestingUtility(conf);
         int clientPort = util.startMiniZKCluster().getClientPort();
         util.startMiniHBaseCluster(1, 1);
@@ -233,7 +237,7 @@
         Util.assertParallelValues(-1, 2, -1, 2, job.getJobConf());
 
         final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
-        util.createTable(Bytes.toBytesBinary("test_table"), COLUMNFAMILY);
+        util.createTable(TableName.valueOf("test_table"), COLUMNFAMILY);
 
         // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
         query = "a = load 'hbase://test_table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
@@ -253,7 +257,7 @@
 
         Util.assertParallelValues(-1, -1, 1, 1, job.getJobConf());
 
-        util.deleteTable(Bytes.toBytesBinary("test_table"));
+        util.deleteTable(TableName.valueOf("test_table"));
         // In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster()
         // here instead.
         MiniHBaseCluster hbc = util.getHBaseCluster();