PIG-5191: Pig HBase 2.0.0 support (nkollar via szita, reviewed by rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1839568 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 07d0106..c56859a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-5191: Pig HBase 2.0.0 support (nkollar via szita, reviewed by rohini)
+
PIG-5344: Update Apache HTTPD LogParser to latest version (nielsbasjes via szita)
PIG-4092: Predicate pushdown for Parquet (nkollar via rohini)
diff --git a/build.xml b/build.xml
index 8bcbe5e..34697ee 100644
--- a/build.xml
+++ b/build.xml
@@ -1713,7 +1713,7 @@
<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/>
<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
- pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion}"/>
+ pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion},hbase${hbaseversion}"/>
<ivy:cachepath pathid="compile.classpath" conf="compile"/>
</target>
diff --git a/ivy.xml b/ivy.xml
index b5ad72d..0902b18 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -40,6 +40,7 @@
<conf name="buildJar" extends="compile,test" visibility="private"/>
<conf name="hadoop2" visibility="private"/>
<conf name="hbase1" visibility="private"/>
+ <conf name="hbase2" visibility="private"/>
<conf name="spark1" visibility="private" />
<conf name="spark2" visibility="private" />
</configurations>
@@ -308,6 +309,167 @@
<exclude org="com.sun.jersey" module="jersey-json"/>
<exclude org="asm" module="asm"/>
</dependency>
+ <dependency org="com.lmax" name="disruptor" rev="3.3.0" conf="hbase1->master"/>
+
+ <!-- HBase 2.x dependencies -->
+ <dependency org="org.apache.hbase" name="hbase-client" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-client" type="jar"/>
+ <artifact name="hbase-client" type="test-jar" ext="jar" m:classifier="tests"/>
+ <exclude org="org.slf4j" module="slf4j-api"/>
+ <exclude org="org.slf4j" module="slf4j-log4j12" />
+ <exclude org="stax" module="stax-api" />
+ <exclude org="javax.xml.bind" module="jaxb-api" />
+ <exclude org="tomcat" module="jasper-runtime"/>
+ <exclude org="tomcat" module="jasper-compiler"/>
+ <exclude org="com.google.protobuf" module="protobuf-java"/>
+ <exclude org="com.sun.jersey" module="jersey-core"/>
+ <exclude org="com.sun.jersey" module="jersey-server"/>
+ <exclude org="com.sun.jersey" module="jersey-json"/>
+ <exclude org="asm" module="asm"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-common" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-common" type="jar"/>
+ <artifact name="hbase-common" type="test-jar" ext="jar" m:classifier="tests"/>
+ <exclude org="org.apache.hadoop" module="hadoop-core"/>
+ <exclude org="stax" module="stax-api" />
+ <exclude org="javax.xml.bind" module="jaxb-api" />
+ <exclude org="javax.ws.rs" module="jsr311-api" />
+ <exclude org="tomcat" module="jasper-runtime"/>
+ <exclude org="tomcat" module="jasper-compiler"/>
+ <exclude org="com.sun.jersey" module="jersey-core"/>
+ <exclude org="com.sun.jersey" module="jersey-server"/>
+ <exclude org="com.sun.jersey" module="jersey-json"/>
+ <exclude org="asm" module="asm"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-server" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-server" type="jar"/>
+ <artifact name="hbase-server" type="test-jar" ext="jar" m:classifier="tests"/>
+ <exclude org="org.apache.hadoop" module="hadoop-core"/>
+ <exclude org="org.slf4j" module="slf4j-api"/>
+ <exclude org="org.slf4j" module="slf4j-log4j12" />
+ <exclude org="stax" module="stax-api" />
+ <exclude org="javax.xml.bind" module="jaxb-api" />
+ <exclude org="javax.ws.rs" module="jsr311-api" />
+ <exclude org="tomcat" module="jasper-runtime"/>
+ <exclude org="tomcat" module="jasper-compiler"/>
+ <exclude org="com.sun.jersey" module="jersey-core"/>
+ <exclude org="com.sun.jersey" module="jersey-server"/>
+ <exclude org="com.sun.jersey" module="jersey-json"/>
+ <exclude org="asm" module="asm"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-protocol" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-protocol" type="jar"/>
+ <artifact name="hbase-protocol" type="test-jar" ext="jar" m:classifier="tests"/>
+ <exclude org="com.google.protobuf" module="protobuf-java"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-hadoop-compat" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-hadoop-compat" type="jar"/>
+ <artifact name="hbase-hadoop-compat" type="test-jar" ext="jar" m:classifier="tests"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-hadoop2-compat" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-hadoop2-compat" type="jar"/>
+ <artifact name="hbase-hadoop2-compat" type="test-jar" ext="jar" m:classifier="tests"/>
+ <exclude org="org.apache.hadoop" module="hadoop-core"/>
+ <exclude org="org.slf4j" module="slf4j-api"/>
+ <exclude org="stax" module="stax-api" />
+ <exclude org="javax.xml.bind" module="jaxb-api" />
+ <exclude org="tomcat" module="jasper-runtime"/>
+ <exclude org="tomcat" module="jasper-compiler"/>
+ <exclude org="com.sun.jersey" module="jersey-core"/>
+ <exclude org="com.sun.jersey" module="jersey-server"/>
+ <exclude org="com.sun.jersey" module="jersey-json"/>
+ <exclude org="asm" module="asm"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-protocol-shaded" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-protocol-shaded" type="jar"/>
+ <artifact name="hbase-protocol-shaded" type="test-jar" ext="jar" m:classifier="tests"/>
+ <exclude org="com.google.protobuf" module="protobuf-java"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-procedure" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-procedure" type="jar"/>
+ <artifact name="hbase-procedure" type="test-jar" ext="jar" m:classifier="tests"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-metrics-api" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-metrics-api" type="jar"/>
+ <artifact name="hbase-metrics-api" type="test-jar" ext="jar" m:classifier="tests"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-metrics" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-metrics" type="jar"/>
+ <artifact name="hbase-metrics" type="test-jar" ext="jar" m:classifier="tests"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-mapreduce" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-mapreduce" type="jar"/>
+ <artifact name="hbase-mapreduce" type="test-jar" ext="jar" m:classifier="tests"/>
+ <exclude org="org.apache.hadoop" module="hadoop-core"/>
+ <exclude org="org.slf4j" module="slf4j-api"/>
+ <exclude org="org.slf4j" module="slf4j-log4j12" />
+ <exclude org="stax" module="stax-api" />
+ <exclude org="javax.xml.bind" module="jaxb-api" />
+ <exclude org="javax.ws.rs" module="jsr311-api" />
+ <exclude org="tomcat" module="jasper-runtime"/>
+ <exclude org="tomcat" module="jasper-compiler"/>
+ <exclude org="com.sun.jersey" module="jersey-core"/>
+ <exclude org="com.sun.jersey" module="jersey-server"/>
+ <exclude org="com.sun.jersey" module="jersey-json"/>
+ <exclude org="asm" module="asm"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-zookeeper" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-zookeeper" type="jar"/>
+ <artifact name="hbase-zookeeper" type="test-jar" ext="jar" m:classifier="tests"/>
+ <exclude org="org.apache.hadoop" module="hadoop-core"/>
+ <exclude org="stax" module="stax-api" />
+ <exclude org="javax.xml.bind" module="jaxb-api" />
+ <exclude org="javax.ws.rs" module="jsr311-api" />
+ <exclude org="tomcat" module="jasper-runtime"/>
+ <exclude org="tomcat" module="jasper-compiler"/>
+ <exclude org="com.sun.jersey" module="jersey-core"/>
+ <exclude org="com.sun.jersey" module="jersey-server"/>
+ <exclude org="com.sun.jersey" module="jersey-json"/>
+ <exclude org="asm" module="asm"/>
+ </dependency>
+
+ <dependency org="org.apache.htrace" name="htrace-core4" rev="${htrace4.version}" conf="hbase2->master">
+ <artifact name="htrace-core4" type="jar"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-replication" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-replication" type="jar"/>
+ <artifact name="hbase-replication" type="test-jar" ext="jar" m:classifier="tests"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase" name="hbase-http" rev="${hbase2.version}" conf="hbase2->master">
+ <artifact name="hbase-http" type="jar"/>
+ <artifact name="hbase-http" type="test-jar" ext="jar" m:classifier="tests"/>
+ </dependency>
+
+ <dependency org="org.apache.hbase.thirdparty" name="hbase-shaded-miscellaneous" rev="2.1.0" conf="hbase2->master" />
+
+ <dependency org="org.apache.hbase.thirdparty" name="hbase-shaded-netty" rev="2.1.0" conf="hbase2->master" />
+
+ <dependency org="org.apache.hbase.thirdparty" name="hbase-shaded-protobuf" rev="2.1.0" conf="hbase2->master" />
+
+ <dependency org="org.eclipse.jetty" name="jetty-http" rev="9.3.20.v20170531" conf="hbase2->master"/>
+ <dependency org="org.eclipse.jetty" name="jetty-io" rev="9.3.20.v20170531" conf="hbase2->master"/>
+ <dependency org="org.eclipse.jetty" name="jetty-security" rev="9.3.20.v20170531" conf="hbase2->master"/>
+ <dependency org="org.eclipse.jetty" name="jetty-server" rev="9.3.20.v20170531" conf="hbase2->master"/>
+ <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="9.3.20.v20170531" conf="hbase2->master"/>
+ <dependency org="org.eclipse.jetty" name="jetty-util" rev="9.3.20.v20170531" conf="hbase2->master"/>
+ <dependency org="org.eclipse.jetty" name="jetty-util-ajax" rev="9.3.20.v20170531" conf="hbase2->master"/>
+ <dependency org="org.eclipse.jetty" name="jetty-webapp" rev="9.3.20.v20170531" conf="hbase2->master"/>
+ <dependency org="org.eclipse.jetty" name="jetty-xml" rev="9.3.20.v20170531" conf="hbase2->master"/>
+ <dependency org="com.lmax" name="disruptor" rev="3.3.6" conf="hbase2->master"/>
+ <!-- End of HBase dependencies -->
<dependency org="org.htrace" name="htrace-core" rev="3.0.4" conf="hadoop2->master"/>
<dependency org="org.apache.htrace" name="htrace-core" rev="${htrace.version}" conf="hadoop2->master"/>
@@ -316,7 +478,6 @@
<dependency org="org.cloudera.htrace" name="htrace-core" rev="2.00" conf="hbase1->master">
<artifact name="htrace-core" type="jar"/>
</dependency>
- <dependency org="com.lmax" name="disruptor" rev="3.3.0" conf="hbase1->master"/>
<!-- for TestHBaseStorage -->
<dependency org="org.apache.hbase" name="hbase-procedure" rev="${hbase1.version}" conf="test->master"/>
@@ -420,6 +581,7 @@
<dependency org="asm" name="asm" rev="${asm.version}" conf="compile->master"/>
<dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="spark1->default;spark2->default"/>
+ <dependency org="javax.servlet" name="javax.servlet-api" rev="3.1.0" conf="test->master"/>
<dependency org="org.scala-lang.modules" name="scala-xml_2.11" rev="${scala-xml.version}" conf="spark1->default;spark2->default"/>
<!-- for Tez integration -->
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 8624a69..ec71472 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -39,6 +39,7 @@
hadoop-hdfs.version=2.7.3
hadoop-mapreduce.version=2.7.3
hbase1.version=1.2.4
+hbase2.version=2.0.0
hsqldb.version=2.4.0
hive.version=1.2.1
httpcomponents.version=4.4
@@ -64,7 +65,7 @@
stringtemplate.version=4.0.4
log4j.version=1.2.16
netty.version=3.6.6.Final
-netty-all.version=4.0.23.Final
+netty-all.version=4.1.1.Final
rats-lib.version=0.5.1
slf4j-api.version=1.6.1
slf4j-log4j12.version=1.6.1
@@ -92,6 +93,7 @@
leveldbjni.version=1.8
curator.version=2.6.0
htrace.version=3.1.0-incubating
+htrace4.version=4.0.1-incubating
commons-lang3.version=3.6
scala-xml.version=1.0.5
glassfish.el.version=3.0.1-b08
\ No newline at end of file
diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
index 7a63b8d..9804038 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
@@ -45,7 +45,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -64,6 +67,7 @@
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
@@ -868,16 +872,22 @@
private void addHBaseDelegationToken(Configuration hbaseConf, Job job) {
if (!UDFContext.getUDFContext().isFrontend()) {
+ LOG.debug("skipping authentication checks because we're currently in a frontend UDF context");
return;
}
if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
+ LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token.");
try {
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- if (currentUser.hasKerberosCredentials()) {
- TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job);
+ User currentUser = User.getCurrent();
+ UserGroupInformation currentUserGroupInformation = currentUser.getUGI();
+ if (currentUserGroupInformation.hasKerberosCredentials()) {
+ try (Connection connection = ConnectionFactory.createConnection(hbaseConf, currentUser)) {
+ TokenUtil.obtainTokenForJob(connection, currentUser, job);
+ LOG.info("Token retrieval succeeded for user " + currentUser.getName());
+ }
} else {
- LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available");
+ LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName());
}
} catch (RuntimeException re) {
throw re;
@@ -885,6 +895,8 @@
throw new UndeclaredThrowableException(e,
"Unexpected error calling TokenUtil.obtainTokenForJob()");
}
+ } else {
+ LOG.info("hbase is not configured to use kerberos, skipping delegation token");
}
}
@@ -996,7 +1008,7 @@
}
if (!columnInfo.isColumnMap()) {
- put.add(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
+ put.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
ts, objToBytes(t.get(i), (fieldSchemas == null) ?
DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
} else {
@@ -1009,7 +1021,7 @@
}
// TODO deal with the fact that maps can have types now. Currently we detect types at
// runtime in the case of storing to a cf, which is suboptimal.
- put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
+ put.addColumn(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
}
}
@@ -1039,7 +1051,7 @@
delete.setTimestamp(timestamp);
if(noWAL_) {
- delete.setWriteToWAL(false);
+ delete.setDurability(Durability.SKIP_WAL);
}
return delete;
@@ -1058,7 +1070,7 @@
Put put = new Put(objToBytes(key, type));
if(noWAL_) {
- put.setWriteToWAL(false);
+ put.setDurability(Durability.SKIP_WAL);
}
return put;
diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
index 662ea19..afbd823 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java
@@ -101,7 +101,7 @@
return splits;
}
- private boolean skipRegion(CompareOp op, byte[] key, byte[] option ) {
+ private boolean skipRegion(CompareOp op, byte[] key, byte[] option ) throws IOException {
if (key.length == 0 || option == null)
return false;
diff --git a/test/org/apache/pig/test/TestHBaseStorage.java b/test/org/apache/pig/test/TestHBaseStorage.java
index b76774f..8ac45b1 100644
--- a/test/org/apache/pig/test/TestHBaseStorage.java
+++ b/test/org/apache/pig/test/TestHBaseStorage.java
@@ -18,6 +18,7 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -29,14 +30,20 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
@@ -56,7 +63,10 @@
import com.google.common.collect.Lists;
+import static org.junit.Assert.assertTrue;
+
public class TestHBaseStorage {
+ private static Connection connection;
private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class);
private static HBaseTestingUtility util;
@@ -84,17 +94,25 @@
@BeforeClass
public static void setUp() throws Exception {
// This is needed by Pig
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, Paths.get(Util.getTestDirectory(TestLoad.class)).toAbsolutePath().toString());
+
conf = HBaseConfiguration.create(new Configuration());
+ // Setting this property is required due to a bug in HBase 2.0
+ // will be fixed in 2.0.1, see HBASE-20544. It doesn't have any effect on HBase 1.x
+ conf.set("hbase.localcluster.assign.random.ports", "true");
util = new HBaseTestingUtility(conf);
util.startMiniZKCluster();
util.startMiniHBaseCluster(1, 1);
+ connection = ConnectionFactory.createConnection(conf);
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
// In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster()
// here instead.
+ connection.close();
MiniHBaseCluster hbc = util.getHBaseCluster();
if (hbc != null) {
hbc.shutdown();
@@ -113,17 +131,17 @@
public void tearDown() throws Exception {
try {
deleteAllRows(TESTTABLE_1);
- } catch (IOException e) {}
+ } catch (Exception e) {}
try {
deleteAllRows(TESTTABLE_2);
- } catch (IOException e) {}
+ } catch (Exception e) {}
pig.shutdown();
}
// DVR: I've found that it is faster to delete all rows in small tables
// than to drop them.
private void deleteAllRows(String tableName) throws Exception {
- HTable table = new HTable(conf, tableName);
+ Table table = connection.getTable(TableName.valueOf(tableName));
ResultScanner scanner = table.getScanner(new Scan());
List<Delete> deletes = Lists.newArrayList();
for (Result row : scanner) {
@@ -194,7 +212,7 @@
public void testLoadWithSpecifiedTimestampAndRanges() throws IOException {
long beforeTimeStamp = System.currentTimeMillis() - 10;
- HTable table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+ Table table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
long afterTimeStamp = System.currentTimeMillis() + 10;
@@ -216,9 +234,9 @@
Assert.assertEquals("Timestamp is set after rows added", 0, queryWithTimestamp(null, null, afterTimeStamp));
- long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatest(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp();
+ long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatestCell(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp();
- Assert.assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0);
+ assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0);
LOG.info("LoadFromHBase done");
@@ -312,7 +330,7 @@
}
/**
- * Test Load from hbase with map parameters and multiple column prefixs
+ * Test Load from hbase with map parameters and multiple column prefixes
*
*/
@Test
@@ -1015,7 +1033,7 @@
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -1057,7 +1075,7 @@
+ TESTCOLUMN_A + " " + TESTCOLUMN_B +
"','-caster HBaseBinaryConverter')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -1094,7 +1112,7 @@
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -1141,7 +1159,7 @@
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -1188,7 +1206,7 @@
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -1233,7 +1251,7 @@
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTombstone true')");
- HTable table = new HTable(conf, TESTTABLE_1);
+ Table table = connection.getTable(TableName.valueOf(TESTTABLE_1));
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int count = 0;
@@ -1276,7 +1294,7 @@
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -1321,8 +1339,8 @@
Assert.assertEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal);
Assert.assertEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal);
} else {
- Assert.assertFalse(put.getWriteToWAL());
- Assert.assertFalse(delete.getWriteToWAL());
+ Assert.assertEquals(Durability.SKIP_WAL, put.getDurability());
+ Assert.assertEquals(Durability.SKIP_WAL, delete.getDurability());
}
}
@@ -1350,8 +1368,8 @@
Assert.assertNotEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal);
Assert.assertNotEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal);
} else {
- Assert.assertTrue(put.getWriteToWAL());
- Assert.assertTrue(delete.getWriteToWAL());
+ Assert.assertEquals(Durability.SKIP_WAL, put.getDurability());
+ Assert.assertEquals(Durability.SKIP_WAL, delete.getDurability());
}
}
@@ -1371,7 +1389,7 @@
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -1406,7 +1424,7 @@
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
- HTable table = new HTable(conf, TESTTABLE_2);
+ Table table = connection.getTable(TableName.valueOf(TESTTABLE_2));
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
@@ -1465,7 +1483,7 @@
// See PIG-4151
public void testStoreEmptyMap() throws IOException {
String tableName = "emptyMapTest";
- HTable table;
+ Table table;
try {
deleteAllRows(tableName);
} catch (Exception e) {
@@ -1475,10 +1493,10 @@
cfs[0] = Bytes.toBytes("info");
cfs[1] = Bytes.toBytes("friends");
try {
- table = util.createTable(Bytes.toBytesBinary(tableName),
+ table = util.createTable(TableName.valueOf(tableName),
cfs);
} catch (Exception e) {
- table = new HTable(conf, Bytes.toBytesBinary(tableName));
+ table = connection.getTable(TableName.valueOf(tableName));
}
File inputFile = Util.createInputFile("test", "tmp", new String[] {"row1;Homer;Morrison;[1#Silvia,2#Stacy]",
@@ -1518,7 +1536,7 @@
+ "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
}
- private HTable prepareTable(String tableName, boolean initData,
+ private Table prepareTable(String tableName, boolean initData,
DataFormat format) throws IOException {
return prepareTable(tableName, initData, format, TableType.ONE_CF);
}
@@ -1526,30 +1544,30 @@
* Prepare a table in hbase for testing.
*
*/
- private HTable prepareTable(String tableName, boolean initData,
+ private Table prepareTable(String tableName, boolean initData,
DataFormat format, TableType type) throws IOException {
// define the table schema
- HTable table = null;
+ Table table = null;
try {
if (lastTableType == type) {
deleteAllRows(tableName);
} else {
- util.deleteTable(tableName);
+ util.deleteTable(TableName.valueOf(tableName));
}
} catch (Exception e) {
// It's ok, table might not exist.
}
try {
if (type == TableType.TWO_CF) {
- table = util.createTable(Bytes.toBytesBinary(tableName),
+ table = util.createTable(TableName.valueOf(tableName),
new byte[][]{COLUMNFAMILY, COLUMNFAMILY2});
} else {
- table = util.createTable(Bytes.toBytesBinary(tableName),
+ table = util.createTable(TableName.valueOf(tableName),
COLUMNFAMILY);
}
lastTableType = type;
} catch (Exception e) {
- table = new HTable(conf, Bytes.toBytesBinary(tableName));
+ table = connection.getTable(TableName.valueOf(tableName));
}
if (initData) {
@@ -1560,23 +1578,23 @@
Put put = new Put(Bytes.toBytes("00".substring(v.length())
+ v));
// sc: int type
- put.add(COLUMNFAMILY, Bytes.toBytes("sc"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("sc"),
Bytes.toBytes(i));
// col_a: int type
- put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_a"),
Bytes.toBytes(i));
// col_b: double type
- put.add(COLUMNFAMILY, Bytes.toBytes("col_b"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_b"),
Bytes.toBytes(i + 0.0));
// col_c: string type
- put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_c"),
Bytes.toBytes("Text_" + i));
// prefixed_col_d: string type
- put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
Bytes.toBytes("PrefixedText_" + i));
// another cf
if (type == TableType.TWO_CF) {
- put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
+ put.addColumn(COLUMNFAMILY2, Bytes.toBytes("col_x"),
Bytes.toBytes(i));
}
table.put(put);
@@ -1585,29 +1603,30 @@
Put put = new Put(
("00".substring(v.length()) + v).getBytes());
// sc: int type
- put.add(COLUMNFAMILY, Bytes.toBytes("sc"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("sc"),
(i + "").getBytes()); // int
// col_a: int type
- put.add(COLUMNFAMILY, Bytes.toBytes("col_a"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_a"),
(i + "").getBytes()); // int
// col_b: double type
- put.add(COLUMNFAMILY, Bytes.toBytes("col_b"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_b"),
((i + 0.0) + "").getBytes());
// col_c: string type
- put.add(COLUMNFAMILY, Bytes.toBytes("col_c"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_c"),
("Text_" + i).getBytes());
// prefixed_col_d: string type
- put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
+ put.addColumn(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
("PrefixedText_" + i).getBytes());
// another cf
if (type == TableType.TWO_CF) {
- put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
+ put.addColumn(COLUMNFAMILY2, Bytes.toBytes("col_x"),
(i + "").getBytes());
}
table.put(put);
}
}
- table.flushCommits();
+ BufferedMutator bm = connection.getBufferedMutator(table.getName());
+ bm.flush();
}
return table;
}
@@ -1632,7 +1651,7 @@
*/
private static long getColTimestamp(Result result, String colName) {
byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
- return result.getColumnLatest(colArray[0], colArray[1]).getTimestamp();
+ return result.getColumnLatestCell(colArray[0], colArray[1]).getTimestamp();
}
}
diff --git a/test/org/apache/pig/test/TestJobSubmission.java b/test/org/apache/pig/test/TestJobSubmission.java
index 53aeaf9..1b754f2 100644
--- a/test/org/apache/pig/test/TestJobSubmission.java
+++ b/test/org/apache/pig/test/TestJobSubmission.java
@@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.Random;
+import org.apache.hadoop.hbase.TableName;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -192,6 +193,9 @@
Util.isMapredExecType(cluster.getExecType()));
// use the estimation
Configuration conf = HBaseConfiguration.create(new Configuration());
+ // Setting this property is required due to a bug in HBase 2.0
+ // will be fixed in 2.0.1, see HBASE-20544. It doesn't have any effect on HBase 1.x
+ conf.set("hbase.localcluster.assign.random.ports", "true");
HBaseTestingUtility util = new HBaseTestingUtility(conf);
int clientPort = util.startMiniZKCluster().getClientPort();
util.startMiniHBaseCluster(1, 1);
@@ -233,7 +237,7 @@
Util.assertParallelValues(-1, 2, -1, 2, job.getJobConf());
final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
- util.createTable(Bytes.toBytesBinary("test_table"), COLUMNFAMILY);
+ util.createTable(TableName.valueOf("test_table"), COLUMNFAMILY);
// the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
query = "a = load 'hbase://test_table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
@@ -253,7 +257,7 @@
Util.assertParallelValues(-1, -1, 1, 1, job.getJobConf());
- util.deleteTable(Bytes.toBytesBinary("test_table"));
+ util.deleteTable(TableName.valueOf("test_table"));
// In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster()
// here instead.
MiniHBaseCluster hbc = util.getHBaseCluster();