CRUNCH-681: Updating HFileUtils to accept a filesystem parameter for targets and sources
Signed-off-by: Josh Wills <jwills@apache.org>
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index d85481d..458ab22 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -52,6 +52,7 @@
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -300,11 +301,30 @@
}
}
+ /**
+ * Scans HFiles.
+ *
+ * @param pipeline the pipeline
+ * @param path path to HFiles
+ * @return {@code Result}s
+ */
public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path) {
return scanHFiles(pipeline, path, new Scan());
}
/**
+ * Scans HFiles with source filesystem.
+ *
+ * @param pipeline the pipeline
+ * @param path path to HFiles
+ * @param fs filesystem where HFiles are located
+ * @return {@code Result}s
+ */
+ public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, FileSystem fs) {
+ return scanHFiles(pipeline, path, new Scan(), fs);
+ }
+
+ /**
* Scans HFiles with filter conditions.
*
* @param pipeline the pipeline
@@ -314,27 +334,74 @@
* @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
*/
public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan) {
- return scanHFiles(pipeline, ImmutableList.of(path), scan);
+ return scanHFiles(pipeline, ImmutableList.of(path), scan);
}
+ /**
+ * Scans HFiles with filter conditions and source filesystem.
+ *
+ * @param pipeline the pipeline
+ * @param path path to HFiles
+ * @param scan filtering conditions
+ * @param fs filesystem where HFiles are located
+ * @return {@code Result}s
+ * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
+ */
+ public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan, FileSystem fs) {
+ return scanHFiles(pipeline, ImmutableList.of(path), scan, fs);
+ }
+
+ /**
+ * Scans HFiles with filter conditions.
+ *
+ * @param pipeline the pipeline
+ * @param paths paths to HFiles
+ * @param scan filtering conditions
+ * @return {@code Result}s
+ * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
+ */
public static PCollection<Result> scanHFiles(Pipeline pipeline, List<Path> paths, Scan scan) {
- PCollection<KeyValue> in = pipeline.read(new HFileSource(paths, scan));
- return combineIntoRow(in, scan);
+ return scanHFiles(pipeline, paths, scan, null);
}
+ /**
+ * Scans HFiles with filter conditions and source filesystem.
+ *
+ * @param pipeline the pipeline
+ * @param paths paths to HFiles
+ * @param scan filtering conditions
+ * @param fs filesystem where HFiles are located
+ * @return {@code Result}s
+ * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
+ */
+ public static PCollection<Result> scanHFiles(Pipeline pipeline, List<Path> paths, Scan scan, FileSystem fs) {
+ PCollection<KeyValue> in = pipeline.read(new HFileSource(paths, scan).fileSystem(fs));
+ return combineIntoRow(in, scan);
+ }
+
+ /**
+ * Converts a bunch of {@link Cell}s into {@link Result}.
+ *
+ * All {@code Cell}s belong to the same row are combined. Deletes are dropped and only
+ * the latest version is kept.
+ *
+ * @param cells the input {@code Cell}s
+ * @return {@code Result}s
+ */
public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells) {
return combineIntoRow(cells, new Scan());
}
/**
- * Converts a bunch of {@link KeyValue}s into {@link Result}.
+ * Converts a bunch of {@link Cell}s into {@link Result}.
*
- * All {@code KeyValue}s belong to the same row are combined. Users may provide some filter
- * conditions (specified by {@code scan}). Deletes are dropped and only a specified number
- * of versions are kept.
+ * All {@code Cell}s belong to the same row are combined. Users may provide some filter
+ * conditions (specified by {@code scan}). Deletes are dropped and only the number
+ * of versions specified by {@code scan.getMaxVersions()} are kept.
*
- * @param cells the input {@code KeyValue}s
- * @param scan filter conditions, currently we support start row, stop row and family map
+ * @param cells the input {@code Cell}s
+ * @param scan filter conditions, currently we support start row, stop row, family map,
+ * time range, and max versions
* @return {@code Result}s
*/
public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells, Scan scan) {
@@ -384,6 +451,24 @@
writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, false);
}
+ public static <C extends Cell> void writeToHFilesForIncrementalLoad(
+ PCollection<C> cells,
+ Connection connection,
+ TableName tableName,
+ Path outputPath,
+ FileSystem fs) throws IOException {
+ writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, false, fs);
+ }
+
+ public static <C extends Cell> void writeToHFilesForIncrementalLoad(
+ PCollection<C> cells,
+ Connection connection,
+ TableName tableName,
+ Path outputPath,
+ boolean limitToAffectedRegions) throws IOException {
+ writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions, null);
+ }
+
/**
* Writes out HFiles from the provided <code>cells</code> and <code>table</code>. <code>limitToAffectedRegions</code>
* is used to indicate that the regions the <code>cells</code> will be loaded into should be identified prior to writing
@@ -398,7 +483,8 @@
Connection connection,
TableName tableName,
Path outputPath,
- boolean limitToAffectedRegions) throws IOException {
+ boolean limitToAffectedRegions,
+ FileSystem fs) throws IOException {
Table table = connection.getTable(tableName);
RegionLocator regionLocator = connection.getRegionLocator(tableName);
HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies();
@@ -416,11 +502,11 @@
for (HColumnDescriptor f : families) {
byte[] family = f.getName();
- HFileTarget hfileTarget = new HFileTarget(new Path(outputPath, Bytes.toString(family)), f);
- hfileTarget.outputConf(RegionLocationTable.REGION_LOCATION_TABLE_PATH, regionLocationFilePath.toString());
partitioned
.filter(new FilterByFamilyFn<C>(family))
- .write(hfileTarget);
+ .write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f)
+ .outputConf(RegionLocationTable.REGION_LOCATION_TABLE_PATH, regionLocationFilePath.toString())
+ .fileSystem(fs));
}
}
@@ -432,6 +518,24 @@
writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, false);
}
+ public static void writePutsToHFilesForIncrementalLoad(
+ PCollection<Put> puts,
+ Connection connection,
+ TableName tableName,
+ Path outputPath,
+ FileSystem fs) throws IOException {
+ writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, false, fs);
+ }
+
+ public static void writePutsToHFilesForIncrementalLoad(
+ PCollection<Put> puts,
+ Connection connection,
+ TableName tableName,
+ Path outputPath,
+ boolean limitToAffectedRegions) throws IOException {
+ writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, limitToAffectedRegions, null);
+ }
+
/**
* Writes out HFiles from the provided <code>puts</code> and <code>table</code>. <code>limitToAffectedRegions</code>
* is used to indicate that the regions the <code>puts</code> will be loaded into should be identified prior to writing
@@ -446,7 +550,8 @@
Connection connection,
TableName tableName,
Path outputPath,
- boolean limitToAffectedRegions) throws IOException {
+ boolean limitToAffectedRegions,
+ FileSystem fs) throws IOException {
PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put, Cell>() {
@Override
public void process(Put input, Emitter<Cell> emitter) {
@@ -455,7 +560,7 @@
}
}
}, HBaseTypes.cells());
- writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions);
+ writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions, fs);
}
public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, RegionLocator regionLocator) throws IOException {