CRUNCH-679: Improvements for usage of DistCp (#20)

* CRUNCH-679: Improvements for usage of DistCp

* CRUNCH-679: Fix NPE bug by preserving IOUtils.cleanup logic

* CRUNCH-679: CrunchRenameCopyListing's constructor needs to be public

* CRUNCH-679: Unset rename configuration after loading into copy listing

* CRUNCH-679: Reduce default max distcp map tasks from 1000 to 100

* CRUNCH-679: Update log message formatting
diff --git a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
index 0ca396c..a4efc8b 100644
--- a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
@@ -101,7 +101,7 @@
         pipeline.run();
 
         // assert the output was written correctly
-        try (FSDataInputStream inputStream = dfsCluster2.open(new Path("hdfs://cluster2/output/out0-m-00000"))) {
+        try (FSDataInputStream inputStream = dfsCluster2.open(new Path("hdfs://cluster2/output/part-m-00000"))) {
             String readValue = IOUtils.toString(inputStream).trim();
             Assert.assertEquals(testString, readValue);
         }
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index f8b1e76..bc15169 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -51,6 +51,8 @@
 
   public static final String FILE_TARGET_MAX_DISTCP_TASKS = "crunch.file.target.max.distcp.tasks";
 
+  public static final String FILE_TARGET_MAX_DISTCP_TASK_BANDWIDTH_MB = "crunch.file.target.max.distcp.task.bandwidth.mb";
+
   // Not instantiated
   private RuntimeParameters() {
   }
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index fc3d2a8..ce47bcc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -51,13 +51,16 @@
 import org.apache.crunch.io.SourceTargetHelper;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.util.CrunchRenameCopyListing;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.tools.CopyListing;
 import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -194,14 +197,17 @@
     Path srcPattern = getSourcePattern(workingPath, index);
     boolean sameFs = isCompatible(srcFs, path);
     boolean useDistributedCopy = conf.getBoolean(RuntimeParameters.FILE_TARGET_USE_DISTCP, true);
-    int maxDistributedCopyTasks = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS, 1000);
+    int maxDistributedCopyTasks = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS, 100);
+    int maxDistributedCopyTaskBandwidthMB = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASK_BANDWIDTH_MB,
+        DistCpConstants.DEFAULT_BANDWIDTH_MB);
     int maxThreads = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1);
 
     if (!sameFs) {
       if (useDistributedCopy) {
         LOG.info("Source and destination are in different file systems, performing distributed copy from {} to {}", srcPattern,
             path);
-        handleOutputsDistributedCopy(dstFsConf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks);
+        handleOutputsDistributedCopy(conf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks,
+            maxDistributedCopyTaskBandwidthMB);
       } else {
         LOG.info("Source and destination are in different file systems, performing asynch copies from {} to {}", srcPattern, path);
         handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
@@ -210,18 +216,17 @@
       LOG.info("Source and destination are in the same file system, performing asynch renames from {} to {}", srcPattern, path);
       handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
     }
-
   }
 
   private void handleOutputsAsynchronously(Configuration conf, Path srcPattern, FileSystem srcFs, FileSystem dstFs,
           boolean sameFs, int maxThreads) throws IOException {
+    Configuration dstFsConf = getEffectiveBundleConfig(conf);
     Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern), srcPattern);
     List<ListenableFuture<Boolean>> renameFutures = Lists.newArrayList();
     ListeningExecutorService executorService =
         MoreExecutors.listeningDecorator(
             Executors.newFixedThreadPool(
                 maxThreads));
-    Configuration dstFsConf = getEffectiveBundleConfig(conf);
     for (Path s : srcs) {
       Path d = getDestFile(dstFsConf, s, path, s.getName().contains("-m-"));
       renameFutures.add(
@@ -255,26 +260,12 @@
   }
 
   private void handleOutputsDistributedCopy(Configuration conf, Path srcPattern, FileSystem srcFs, FileSystem dstFs,
-          int maxDistributedCopyTasks) throws IOException {
+          int maxTasks, int maxBandwidthMB) throws IOException {
+    Configuration dstFsConf = getEffectiveBundleConfig(conf);
     Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern), srcPattern);
     if (srcs.length > 0) {
-      LOG.info("Distributed copying {} files using at most {} tasks", srcs.length, maxDistributedCopyTasks);
-      // Once https://issues.apache.org/jira/browse/HADOOP-15281 is available, we can use the direct write
-      // distcp optimization if the target path is in S3
-      DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
-      options.setMaxMaps(maxDistributedCopyTasks);
-      options.setOverwrite(true);
-      options.setBlocking(true);
-
-      Configuration distCpConf = new Configuration(conf);
-      // Remove unnecessary and problematic properties from the DistCp configuration. This is necessary since
-      // files referenced by these properties may have already been deleted when the DistCp is being started.
-      distCpConf.unset("mapreduce.job.cache.files");
-      distCpConf.unset("mapreduce.job.classpath.files");
-      distCpConf.unset("tmpjars");
-
       try {
-        DistCp distCp = new DistCp(distCpConf, options);
+        DistCp distCp = createDistCp(srcs, maxTasks, maxBandwidthMB, dstFsConf);
         if (!distCp.execute().isSuccessful()) {
           throw new CrunchRuntimeException("Distributed copy failed from " + srcPattern + " to " + path);
         }
@@ -329,7 +320,38 @@
     }
     return new Path(dir, outputFilename);
   }
-  
+
+  protected DistCp createDistCp(Path[] srcs, int maxTasks, int maxBandwidthMB, Configuration conf) throws Exception {
+    LOG.info("Distributed copying {} files using at most {} tasks and bandwidth limit of {} MB/s per task",
+        new Object[]{srcs.length, maxTasks, maxBandwidthMB});
+
+    Configuration distCpConf = new Configuration(conf);
+
+    // Remove unnecessary and problematic properties from the DistCp configuration. This is necessary since
+    // files referenced by these properties may have already been deleted when the DistCp is being started.
+    distCpConf.unset("mapreduce.job.cache.files");
+    distCpConf.unset("mapreduce.job.classpath.files");
+    distCpConf.unset("tmpjars");
+
+    // Setup renaming for part files
+    List<String> renames = Lists.newArrayList();
+    for (Path s : srcs) {
+      Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
+      renames.add(s.getName() + ":" + d.getName());
+    }
+    distCpConf.setStrings(CrunchRenameCopyListing.DISTCP_PATH_RENAMES, renames.toArray(new String[renames.size()]));
+    distCpConf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS, CrunchRenameCopyListing.class, CopyListing.class);
+
+    // Once https://issues.apache.org/jira/browse/HADOOP-15281 is available, we can use the direct write
+    // distcp optimization if the target path is in S3
+    DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
+    options.setMaxMaps(maxTasks);
+    options.setMapBandwidth(maxBandwidthMB);
+    options.setBlocking(true);
+
+    return new DistCp(distCpConf, options);
+  }
+
   /**
    * Extract the partition number from a raw reducer output filename.
    *
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/CrunchRenameCopyListing.java b/crunch-core/src/main/java/org/apache/crunch/util/CrunchRenameCopyListing.java
new file mode 100644
index 0000000..b930beb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/CrunchRenameCopyListing.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the
+ * Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.SimpleCopyListing;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+ * A custom {@link CopyListing} implementation capable of dynamically renaming
+ * the target paths according to a {@link #DISTCP_PATH_RENAMES configured set of values}.
+ * <p>
+ * Once https://issues.apache.org/jira/browse/HADOOP-16147 is available, this
+ * class can be significantly simplified.
+ * </p>
+ */
+public class CrunchRenameCopyListing extends SimpleCopyListing {
+  /**
+   * Comma-separated list of original-file:renamed-file path rename pairs.
+   */
+  public static final String DISTCP_PATH_RENAMES = "crunch.distcp.path.renames";
+
+  private static final Logger LOG = LoggerFactory.getLogger(CrunchRenameCopyListing.class);
+  private final Map<String, String> pathRenames;
+
+  private long totalPaths = 0;
+  private long totalBytesToCopy = 0;
+
+  /**
+   * Constructor, to initialize configuration.
+   *
+   * @param configuration The input configuration, with which the source/target FileSystems may be accessed.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  public CrunchRenameCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+
+    pathRenames = new HashMap<>();
+
+    String[] pathRenameConf = configuration.getStrings(DISTCP_PATH_RENAMES);
+    if (pathRenameConf == null) {
+      throw new IllegalArgumentException("Missing required configuration: " + DISTCP_PATH_RENAMES);
+    }
+    for (String pathRename : pathRenameConf) {
+      String[] pathRenameParts = pathRename.split(":");
+      if (pathRenameParts.length != 2) {
+        throw new IllegalArgumentException("Invalid path rename format: " + pathRename);
+      }
+      if (pathRenames.put(pathRenameParts[0], pathRenameParts[1]) != null) {
+        throw new IllegalArgumentException("Invalid duplicate path rename: " + pathRenameParts[0]);
+      }
+    }
+    LOG.info("Loaded {} path rename entries", pathRenames.size());
+
+    // Clear out the rename configuration property, as it is no longer needed
+    configuration.unset(DISTCP_PATH_RENAMES);
+  }
+
+  @Override
+  public void doBuildListing(SequenceFile.Writer fileListWriter, DistCpOptions options) throws IOException {
+    try {
+      for (Path path : options.getSourcePaths()) {
+        FileSystem sourceFS = path.getFileSystem(getConf());
+        final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
+        final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
+        final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
+        path = makeQualified(path);
+
+        FileStatus rootStatus = sourceFS.getFileStatus(path);
+        Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
+
+        FileStatus[] sourceFiles = sourceFS.listStatus(path);
+        boolean explore = (sourceFiles != null && sourceFiles.length > 0);
+        if (!explore || rootStatus.isDirectory()) {
+          CopyListingFileStatus rootCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, preserveAcls,
+              preserveXAttrs, preserveRawXAttrs);
+          writeToFileListingRoot(fileListWriter, rootCopyListingStatus, sourcePathRoot, options);
+        }
+        if (explore) {
+          for (FileStatus sourceStatus : sourceFiles) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
+            }
+            CopyListingFileStatus sourceCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
+                preserveAcls && sourceStatus.isDirectory(), preserveXAttrs && sourceStatus.isDirectory(),
+                preserveRawXAttrs && sourceStatus.isDirectory());
+            writeToFileListing(fileListWriter, sourceCopyListingStatus, sourcePathRoot, options);
+
+            if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
+              }
+              traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, options);
+            }
+          }
+        }
+      }
+      fileListWriter.close();
+      fileListWriter = null;
+    } finally {
+      if (fileListWriter != null) {
+        try {
+          fileListWriter.close();
+        } catch(IOException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Exception in closing {}", fileListWriter, e);
+          }
+        }
+      }
+    }
+  }
+
+  private Path computeSourceRootPath(FileStatus sourceStatus, DistCpOptions options) throws IOException {
+    Path target = options.getTargetPath();
+    FileSystem targetFS = target.getFileSystem(getConf());
+    final boolean targetPathExists = options.getTargetPathExists();
+
+    boolean solitaryFile = options.getSourcePaths().size() == 1 && !sourceStatus.isDirectory();
+
+    if (solitaryFile) {
+      if (targetFS.isFile(target) || !targetPathExists) {
+        return sourceStatus.getPath();
+      } else {
+        return sourceStatus.getPath().getParent();
+      }
+    } else {
+      boolean specialHandling =
+          (options.getSourcePaths().size() == 1 && !targetPathExists) || options.shouldSyncFolder() || options.shouldOverwrite();
+
+      return specialHandling && sourceStatus.isDirectory() ? sourceStatus.getPath() : sourceStatus.getPath().getParent();
+    }
+  }
+
+  private Path makeQualified(Path path) throws IOException {
+    final FileSystem fs = path.getFileSystem(getConf());
+    return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+
+  private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem, FileStatus fileStatus) throws IOException {
+    return fileStatus.isDirectory() && getChildren(fileSystem, fileStatus).length > 0;
+  }
+
+  private static FileStatus[] getChildren(FileSystem fileSystem, FileStatus parent) throws IOException {
+    return fileSystem.listStatus(parent.getPath());
+  }
+
+  private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter, FileStatus sourceStatus, Path sourcePathRoot,
+      DistCpOptions options) throws IOException {
+    FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
+    final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
+    final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
+    final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
+    Stack<FileStatus> pathStack = new Stack<>();
+    pathStack.push(sourceStatus);
+
+    while (!pathStack.isEmpty()) {
+      for (FileStatus child : getChildren(sourceFS, pathStack.pop())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
+        }
+        CopyListingFileStatus childCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+            preserveAcls && child.isDirectory(), preserveXAttrs && child.isDirectory(), preserveRawXattrs && child.isDirectory());
+        writeToFileListing(fileListWriter, childCopyListingStatus, sourcePathRoot, options);
+        if (isDirectoryAndNotEmpty(sourceFS, child)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
+          }
+          pathStack.push(child);
+        }
+      }
+    }
+  }
+
+  private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, Path sourcePathRoot,
+      DistCpOptions options) throws IOException {
+    boolean syncOrOverwrite = options.shouldSyncFolder() || options.shouldOverwrite();
+    if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDirectory() && syncOrOverwrite) {
+      // Skip the root-paths when syncOrOverwrite
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip {}", fileStatus.getPath());
+      }
+      return;
+    }
+    writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, options);
+  }
+
+  private void writeToFileListing(SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, Path sourcePathRoot,
+      DistCpOptions options) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("REL PATH: {}, FULL PATH: {}",
+          DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()), fileStatus.getPath());
+    }
+
+    if (!shouldCopy(fileStatus.getPath(), options)) {
+      return;
+    }
+
+    fileListWriter.append(getFileListingKey(sourcePathRoot, fileStatus),
+        getFileListingValue(fileStatus));
+    fileListWriter.sync();
+
+    if (!fileStatus.isDirectory()) {
+      totalBytesToCopy += fileStatus.getLen();
+    }
+    totalPaths++;
+  }
+
+  /**
+   * Returns the key for an entry in the copy listing sequence file
+   * @param sourcePathRoot the root source path for determining the relative target path
+   * @param fileStatus the copy listing file status
+   * @return the key for the sequence file entry
+   */
+  protected Text getFileListingKey(Path sourcePathRoot, CopyListingFileStatus fileStatus) {
+    Path fileStatusPath = fileStatus.getPath();
+    String pathName = fileStatusPath.getName();
+    String renamedPathName = pathRenames.get(pathName);
+
+    if (renamedPathName != null && !pathName.equals(renamedPathName)) {
+      LOG.info("Applying dynamic rename of {} to {}", pathName, renamedPathName);
+      fileStatusPath = new Path(fileStatusPath.getParent(), renamedPathName);
+    }
+    return new Text(DistCpUtils.getRelativePath(sourcePathRoot, fileStatusPath));
+  }
+
+  /**
+   * Returns the value for an entry in the copy listing sequence file
+   * @param fileStatus the copy listing file status
+   * @return the value for the sequence file entry
+   */
+  protected CopyListingFileStatus getFileListingValue(CopyListingFileStatus fileStatus) {
+    return fileStatus;
+  }
+
+  @Override
+  protected long getBytesToCopy() {
+    return totalBytesToCopy;
+  }
+
+  @Override
+  protected long getNumberOfPaths() {
+    return totalPaths;
+  }
+}
\ No newline at end of file
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index 420c9dd..b1ce5ba 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -17,16 +17,12 @@
  */
 package org.apache.crunch.io.hbase;
 
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -34,18 +30,9 @@
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.tools.DistCp;
-import org.apache.hadoop.tools.DistCpOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
 
 public class HFileTarget extends FileTargetImpl {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HFileTarget.class);
-
   public HFileTarget(String path) {
     this(new Path(path));
   }
@@ -91,64 +78,6 @@
   }
 
   @Override
-  public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
-    FileSystem srcFs = workingPath.getFileSystem(conf);
-    Path src = getSourcePattern(workingPath, index);
-    Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
-    FileSystem dstFs = path.getFileSystem(conf);
-    if (!dstFs.exists(path)) {
-      dstFs.mkdirs(path);
-    }
-    boolean sameFs = isCompatible(srcFs, path);
-
-    if (!sameFs) {
-      if (srcs.length > 0) {
-        int maxDistributedCopyTasks = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS, 1000);
-        LOG.info(
-                "Source and destination are in different file systems, performing distcp of {} files from [{}] to [{}] "
-                        + "using at most {} tasks",
-                new Object[] { srcs.length, src, path, maxDistributedCopyTasks });
-        // Once https://issues.apache.org/jira/browse/HADOOP-15281 is available, we can use the direct write
-        // distcp optimization if the target path is in S3
-        DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
-        options.setMaxMaps(maxDistributedCopyTasks);
-        options.setOverwrite(true);
-        options.setBlocking(true);
-
-        Configuration distCpConf = new Configuration(conf);
-        // Remove unnecessary and problematic properties from the DistCp configuration. This is necessary since
-        // files referenced by these properties may have already been deleted when the DistCp is being started.
-        distCpConf.unset("mapreduce.job.cache.files");
-        distCpConf.unset("mapreduce.job.classpath.files");
-        distCpConf.unset("tmpjars");
-
-        try {
-          DistCp distCp = new DistCp(distCpConf, options);
-          if (!distCp.execute().isSuccessful()) {
-            throw new CrunchRuntimeException("Unable to move files through distcp from " + src + " to " + path);
-          }
-          LOG.info("Distributed copy completed for {} files", srcs.length);
-        } catch (Exception e) {
-          throw new CrunchRuntimeException("Unable to move files through distcp from " + src + " to " + path, e);
-        }
-      } else {
-        LOG.info("No files found at [{}], not attempting to copy HFiles", src);
-      }
-    } else {
-      LOG.info(
-              "Source and destination are in the same file system, performing rename of {} files from [{}] to [{}]",
-              new Object[] { srcs.length, src, path });
-
-      for (Path s : srcs) {
-        Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
-        srcFs.rename(s, d);
-      }
-    }
-    dstFs.create(getSuccessIndicator(), true).close();
-    LOG.info("Created success indicator file");
-  }
-
-  @Override
   public String toString() {
     return "HFile(" + path + ")";
   }