CRUNCH-681: Updating HFileUtils to accept a filesystem parameter for targets and sources

Signed-off-by: Josh Wills <jwills@apache.org>
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index d85481d..458ab22 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -52,6 +52,7 @@
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -300,11 +301,30 @@
     }
   }
 
+  /**
+   * Scans HFiles.
+   *
+   * @param pipeline the pipeline
+   * @param path path to HFiles
+   * @return {@code Result}s
+   */
   public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path) {
     return scanHFiles(pipeline, path, new Scan());
   }
 
   /**
+   * Scans HFiles with source filesystem.
+   *
+   * @param pipeline the pipeline
+   * @param path path to HFiles
+   * @param fs filesystem where HFiles are located
+   * @return {@code Result}s
+   */
+  public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, FileSystem fs) {
+    return scanHFiles(pipeline, path, new Scan(), fs);
+  }
+
+  /**
    * Scans HFiles with filter conditions.
    *
    * @param pipeline the pipeline
@@ -314,27 +334,74 @@
    * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
    */
   public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan) {
-      return scanHFiles(pipeline, ImmutableList.of(path), scan);
+    return scanHFiles(pipeline, ImmutableList.of(path), scan);
   }
 
+  /**
+   * Scans HFiles with filter conditions and source filesystem.
+   *
+   * @param pipeline the pipeline
+   * @param path path to HFiles
+   * @param scan filtering conditions
+   * @param fs filesystem where HFiles are located
+   * @return {@code Result}s
+   * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
+   */
+  public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan, FileSystem fs) {
+    return scanHFiles(pipeline, ImmutableList.of(path), scan, fs);
+  }
+
+  /**
+   * Scans HFiles with filter conditions.
+   *
+   * @param pipeline the pipeline
+   * @param paths paths to HFiles
+   * @param scan filtering conditions
+   * @return {@code Result}s
+   * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
+   */
   public static PCollection<Result> scanHFiles(Pipeline pipeline, List<Path> paths, Scan scan) {
-      PCollection<KeyValue> in = pipeline.read(new HFileSource(paths, scan));
-      return combineIntoRow(in, scan);
+    return scanHFiles(pipeline, paths, scan, null);
   }
 
+  /**
+   * Scans HFiles with filter conditions and source filesystem.
+   *
+   * @param pipeline the pipeline
+   * @param paths paths to HFiles
+   * @param scan filtering conditions
+   * @param fs filesystem where HFiles are located
+   * @return {@code Result}s
+   * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan)
+   */
+  public static PCollection<Result> scanHFiles(Pipeline pipeline, List<Path> paths, Scan scan, FileSystem fs) {
+    PCollection<KeyValue> in = pipeline.read(new HFileSource(paths, scan).fileSystem(fs));
+    return combineIntoRow(in, scan);
+  }
+
+  /**
+   * Converts a bunch of {@link Cell}s into {@link Result}.
+   *
+   * All {@code Cell}s belong to the same row are combined. Deletes are dropped and only
+   * the latest version is kept.
+   *
+   * @param cells the input {@code Cell}s
+   * @return {@code Result}s
+   */
   public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells) {
     return combineIntoRow(cells, new Scan());
   }
 
   /**
-   * Converts a bunch of {@link KeyValue}s into {@link Result}.
+   * Converts a bunch of {@link Cell}s into {@link Result}.
    *
-   * All {@code KeyValue}s belong to the same row are combined. Users may provide some filter
-   * conditions (specified by {@code scan}). Deletes are dropped and only a specified number
-   * of versions are kept.
+   * All {@code Cell}s belong to the same row are combined. Users may provide some filter
+   * conditions (specified by {@code scan}). Deletes are dropped and only the number
+   * of versions specified by {@code scan.getMaxVersions()} are kept.
    *
-   * @param cells the input {@code KeyValue}s
-   * @param scan filter conditions, currently we support start row, stop row and family map
+   * @param cells the input {@code Cell}s
+   * @param scan filter conditions, currently we support start row, stop row, family map,
+   *             time range, and max versions
    * @return {@code Result}s
    */
   public static <C extends Cell> PCollection<Result> combineIntoRow(PCollection<C> cells, Scan scan) {
@@ -384,6 +451,24 @@
     writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, false);
   }
 
+  public static <C extends Cell> void writeToHFilesForIncrementalLoad(
+      PCollection<C> cells,
+      Connection connection,
+      TableName tableName,
+      Path outputPath,
+      FileSystem fs) throws IOException {
+    writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, false, fs);
+  }
+
+  public static <C extends Cell> void writeToHFilesForIncrementalLoad(
+      PCollection<C> cells,
+      Connection connection,
+      TableName tableName,
+      Path outputPath,
+      boolean limitToAffectedRegions) throws IOException {
+    writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions, null);
+  }
+
   /**
    * Writes out HFiles from the provided <code>cells</code> and <code>table</code>. <code>limitToAffectedRegions</code>
    * is used to indicate that the regions the <code>cells</code> will be loaded into should be identified prior to writing
@@ -398,7 +483,8 @@
       Connection connection,
       TableName tableName,
       Path outputPath,
-      boolean limitToAffectedRegions) throws IOException {
+      boolean limitToAffectedRegions,
+      FileSystem fs) throws IOException {
     Table table = connection.getTable(tableName);
     RegionLocator regionLocator = connection.getRegionLocator(tableName);
     HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies();
@@ -416,11 +502,11 @@
 
     for (HColumnDescriptor f : families) {
       byte[] family = f.getName();
-      HFileTarget hfileTarget = new HFileTarget(new Path(outputPath, Bytes.toString(family)), f);
-      hfileTarget.outputConf(RegionLocationTable.REGION_LOCATION_TABLE_PATH, regionLocationFilePath.toString());
       partitioned
           .filter(new FilterByFamilyFn<C>(family))
-          .write(hfileTarget);
+          .write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f)
+              .outputConf(RegionLocationTable.REGION_LOCATION_TABLE_PATH, regionLocationFilePath.toString())
+              .fileSystem(fs));
     }
   }
 
@@ -432,6 +518,24 @@
     writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, false);
   }
 
+  public static void writePutsToHFilesForIncrementalLoad(
+      PCollection<Put> puts,
+      Connection connection,
+      TableName tableName,
+      Path outputPath,
+      FileSystem fs) throws IOException {
+    writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, false, fs);
+  }
+
+  public static void writePutsToHFilesForIncrementalLoad(
+      PCollection<Put> puts,
+      Connection connection,
+      TableName tableName,
+      Path outputPath,
+      boolean limitToAffectedRegions) throws IOException {
+    writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, limitToAffectedRegions, null);
+  }
+
   /**
    * Writes out HFiles from the provided <code>puts</code> and <code>table</code>. <code>limitToAffectedRegions</code>
    * is used to indicate that the regions the <code>puts</code> will be loaded into should be identified prior to writing
@@ -446,7 +550,8 @@
       Connection connection,
       TableName tableName,
       Path outputPath,
-      boolean limitToAffectedRegions) throws IOException {
+      boolean limitToAffectedRegions,
+      FileSystem fs) throws IOException {
     PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put, Cell>() {
       @Override
       public void process(Put input, Emitter<Cell> emitter) {
@@ -455,7 +560,7 @@
         }
       }
     }, HBaseTypes.cells());
-    writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions);
+    writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions, fs);
   }
 
   public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, RegionLocator regionLocator) throws IOException {