CRUNCH-550: Removed deprecations in crunch-hbase also added
support for TableName.
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index 28ead90..4a06c0f 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
@@ -178,7 +179,7 @@
HBaseSourceTarget source = null;
if(clazz == null){
- source = new HBaseSourceTarget(inputTableName, scan, scan2);
+ source = new HBaseSourceTarget(TableName.valueOf(inputTableName), scan, scan2);
}else{
source = new HBaseSourceTarget(inputTableName, clazz, new Scan[]{scan, scan2});
}
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
index 18d5a95..16f6694 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java
@@ -21,6 +21,7 @@
import org.apache.crunch.TableSource;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -35,6 +36,14 @@
}
public static TableSource<ImmutableBytesWritable, Result> table(String table, Scan scan) {
+ return table(TableName.valueOf(table), scan);
+ }
+
+ public static TableSource<ImmutableBytesWritable, Result> table(TableName table) {
+ return table(table, new Scan());
+ }
+
+ public static TableSource<ImmutableBytesWritable, Result> table(TableName table, Scan scan) {
return new HBaseSourceTarget(table, scan);
}
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
index 4a721f3..4ac6c8e 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
@@ -23,9 +23,13 @@
import org.apache.crunch.SourceTarget;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.StringUtils;
@@ -36,11 +40,13 @@
public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Result>> {
private final String table;
+ private transient TableName tableName;
private final String scansAsString;
private transient SourceTarget parent;
public HBaseData(String table, String scansAsString, SourceTarget<?> parent) {
this.table = table;
+ this.tableName = TableName.valueOf(table);
this.scansAsString = scansAsString;
this.parent = parent;
}
@@ -63,7 +69,8 @@
public Iterable<Pair<ImmutableBytesWritable, Result>> read(
TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException {
Configuration hconf = HBaseConfiguration.create(ctxt.getConfiguration());
- HTable htable = new HTable(hconf, table);
+ Connection connection = ConnectionFactory.createConnection(hconf);
+ Table htable = connection.getTable(getTableName());
String[] scanStrings = StringUtils.getStrings(scansAsString);
int length = scanStrings == null ? 0 : scanStrings.length;
@@ -72,6 +79,13 @@
scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]);
}
- return new HTableIterable(htable, scans);
+ return new HTableIterable(connection, htable, scans);
+ }
+
+ private TableName getTableName(){
+ if(tableName == null){
+ tableName = TableName.valueOf(table);
+ }
+ return tableName;
}
}
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index c98436d..ede7603 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -36,9 +36,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase;
@@ -66,6 +69,7 @@
protected Scan[] scans;
protected String scansAsString;
+
private FormatBundle<? extends MultiTableInputFormatBase> inputBundle;
public HBaseSourceTarget(String table, Scan scan) {
@@ -75,25 +79,37 @@
public HBaseSourceTarget(String table, Scan scan, Scan... additionalScans) {
this(table, ObjectArrays.concat(scan, additionalScans));
}
+
+ public HBaseSourceTarget(TableName table, Scan scan, Scan... additionalScans) {
+ this(table, ObjectArrays.concat(scan, additionalScans));
+ }
public HBaseSourceTarget(String table, Scan[] scans) {
this(table, MultiTableInputFormat.class, scans);
}
+ public HBaseSourceTarget(TableName table, Scan[] scans) {
+ this(table, MultiTableInputFormat.class, scans);
+ }
+
public HBaseSourceTarget(String table, Class<? extends MultiTableInputFormatBase> clazz, Scan[] scans) {
- super(table);
+ this(TableName.valueOf(table), clazz, scans);
+ }
+
+ public HBaseSourceTarget(TableName tableName, Class<? extends MultiTableInputFormatBase> clazz, Scan[] scans) {
+ super(tableName);
this.scans = scans;
try {
- byte[] tableName = Bytes.toBytes(table);
+ byte[] tableNameAsBytes = Bytes.toBytes(table);
//Copy scans and enforce that they are for the table specified
Scan[] tableScans = new Scan[scans.length];
String[] scanStrings = new String[scans.length];
for(int i = 0; i < scans.length; i++){
tableScans[i] = new Scan(scans[i]);
//enforce Scan is for same table
- tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName);
+ tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableNameAsBytes);
//Convert the Scan into a String
scanStrings[i] = convertScanToString(tableScans[i]);
}
@@ -190,8 +206,9 @@
@Override
public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf) throws IOException {
Configuration hconf = HBaseConfiguration.create(conf);
- HTable htable = new HTable(hconf, table);
- return new HTableIterable(htable, scans);
+ Connection connection = ConnectionFactory.createConnection(hconf);
+ Table htable = connection.getTable(getTableName());
+ return new HTableIterable(connection, htable, scans);
}
@Override
@@ -205,5 +222,4 @@
outputConf(key, value);
return this;
}
-
}
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 7c67577..f287d5e 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -50,10 +51,18 @@
private static final Logger LOG = LoggerFactory.getLogger(HBaseTarget.class);
protected String table;
+
+ private transient TableName tableName;
+
private Map<String, String> extraConf = Maps.newHashMap();
public HBaseTarget(String table) {
- this.table = table;
+ this(TableName.valueOf(table));
+ }
+
+ public HBaseTarget(TableName tableName){
+ this.tableName = tableName;
+ this.table = tableName.getNameAsString();
}
@Override
@@ -153,4 +162,11 @@
ptype.getTypeClass());
}
}
+
+ protected TableName getTableName(){
+ if(tableName == null){
+ tableName = TableName.valueOf(table);
+ }
+ return tableName;
+ }
}
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
index 7381ddd..26821bf 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
@@ -25,7 +25,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -185,7 +184,7 @@
// to depend on it.
private static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
throws IOException {
- int result = s.seekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
+ int result = s.seekTo(k);
if(result < 0) {
// Passed KV is smaller than first KV in file, work from start of file
return s.seekTo();
@@ -206,7 +205,7 @@
// Explode out directories that match the original FileInputFormat filters since HFiles are written to directories where the
// directory name is the column name
for (FileStatus status : super.listStatus(job)) {
- if (status.isDir()) {
+ if (status.isDirectory()) {
FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
result.add(match);
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
index bd3cc8f..2240b9c 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
@@ -137,7 +137,7 @@
}
long sum = 0;
for (FileStatus status : statuses) {
- if (status.isDir()) {
+ if (status.isDirectory()) {
sum += SourceTargetHelper.getPathSize(fs, status.getPath());
} else {
sum += status.getLen();
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
index a3dfc7d..c772515 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
@@ -20,9 +20,11 @@
package org.apache.crunch.io.hbase;
import org.apache.crunch.Pair;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import java.io.IOException;
@@ -30,16 +32,18 @@
import java.util.Iterator;
class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> {
- private final HTable table;
+ private final Table table;
private final Scan[] scans;
+ private final Connection connection;
- public HTableIterable(HTable table, Scan... scans) {
+ public HTableIterable(Connection connection, Table table, Scan... scans) {
this.table = table;
+ this.connection = connection;
this.scans = scans;
}
@Override
public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() {
- return new HTableIterator(table, Arrays.asList(scans));
+ return new HTableIterator(connection, table, Arrays.asList(scans));
}
}
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
index 3db5897..ebef5d3 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
@@ -20,10 +20,11 @@
package org.apache.crunch.io.hbase;
import org.apache.crunch.Pair;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,13 +36,15 @@
class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> {
private static final Logger LOG = LoggerFactory.getLogger(HTableIterator.class);
- private final HTable table;
+ private final Table table;
+ private final Connection connection;
private final Iterator<Scan> scans;
private ResultScanner scanner;
private Iterator<Result> iter;
- public HTableIterator(HTable table, List<Scan> scans) {
+ public HTableIterator(Connection connection, Table table, List<Scan> scans) {
this.table = table;
+ this.connection = connection;
this.scans = scans.iterator();
try{
this.scanner = table.getScanner(this.scans.next());
@@ -70,6 +73,11 @@
} catch (IOException e) {
LOG.error("Exception closing HTable: {}", table.getName(), e);
}
+ try {
+ connection.close();
+ } catch (IOException e) {
+ LOG.error("Exception closing HTable: {}", table.getName(), e);
+ }
}
}
return hasNext;
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
index 2c53ae1..78267cf 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java
@@ -19,6 +19,7 @@
import org.apache.crunch.Target;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
/**
* Static factory methods for creating HBase {@link Target} types.
@@ -26,6 +27,10 @@
public class ToHBase {
public static Target table(String table) {
+ return table(TableName.valueOf(table));
+ }
+
+ public static Target table(TableName table) {
return new HBaseTarget(table);
}