CRUNCH-679: Improvements for usage of DistCp (#20)
* CRUNCH-679: Improvements for usage of DistCp
* CRUNCH-679: Fix NPE bug by preserving IOUtils.cleanup logic
* CRUNCH-679: CrunchRenameCopyListing's constructor needs to be public
* CRUNCH-679: Unset rename configuration after loading into copy listing
* CRUNCH-679: Reduce default max distcp map tasks from 1000 to 100
* CRUNCH-679: Update log message formatting
diff --git a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
index 0ca396c..a4efc8b 100644
--- a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
@@ -101,7 +101,7 @@
pipeline.run();
// assert the output was written correctly
- try (FSDataInputStream inputStream = dfsCluster2.open(new Path("hdfs://cluster2/output/out0-m-00000"))) {
+ try (FSDataInputStream inputStream = dfsCluster2.open(new Path("hdfs://cluster2/output/part-m-00000"))) {
String readValue = IOUtils.toString(inputStream).trim();
Assert.assertEquals(testString, readValue);
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index f8b1e76..bc15169 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -51,6 +51,8 @@
public static final String FILE_TARGET_MAX_DISTCP_TASKS = "crunch.file.target.max.distcp.tasks";
+ public static final String FILE_TARGET_MAX_DISTCP_TASK_BANDWIDTH_MB = "crunch.file.target.max.distcp.task.bandwidth.mb";
+
// Not instantiated
private RuntimeParameters() {
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index fc3d2a8..ce47bcc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -51,13 +51,16 @@
import org.apache.crunch.io.SourceTargetHelper;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
+import org.apache.crunch.util.CrunchRenameCopyListing;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -194,14 +197,17 @@
Path srcPattern = getSourcePattern(workingPath, index);
boolean sameFs = isCompatible(srcFs, path);
boolean useDistributedCopy = conf.getBoolean(RuntimeParameters.FILE_TARGET_USE_DISTCP, true);
- int maxDistributedCopyTasks = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS, 1000);
+ int maxDistributedCopyTasks = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS, 100);
+ int maxDistributedCopyTaskBandwidthMB = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASK_BANDWIDTH_MB,
+ DistCpConstants.DEFAULT_BANDWIDTH_MB);
int maxThreads = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1);
if (!sameFs) {
if (useDistributedCopy) {
LOG.info("Source and destination are in different file systems, performing distributed copy from {} to {}", srcPattern,
path);
- handleOutputsDistributedCopy(dstFsConf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks);
+ handleOutputsDistributedCopy(conf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks,
+ maxDistributedCopyTaskBandwidthMB);
} else {
LOG.info("Source and destination are in different file systems, performing asynch copies from {} to {}", srcPattern, path);
handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
@@ -210,18 +216,17 @@
LOG.info("Source and destination are in the same file system, performing asynch renames from {} to {}", srcPattern, path);
handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
}
-
}
private void handleOutputsAsynchronously(Configuration conf, Path srcPattern, FileSystem srcFs, FileSystem dstFs,
boolean sameFs, int maxThreads) throws IOException {
+ Configuration dstFsConf = getEffectiveBundleConfig(conf);
Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern), srcPattern);
List<ListenableFuture<Boolean>> renameFutures = Lists.newArrayList();
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
maxThreads));
- Configuration dstFsConf = getEffectiveBundleConfig(conf);
for (Path s : srcs) {
Path d = getDestFile(dstFsConf, s, path, s.getName().contains("-m-"));
renameFutures.add(
@@ -255,26 +260,12 @@
}
private void handleOutputsDistributedCopy(Configuration conf, Path srcPattern, FileSystem srcFs, FileSystem dstFs,
- int maxDistributedCopyTasks) throws IOException {
+ int maxTasks, int maxBandwidthMB) throws IOException {
+ Configuration dstFsConf = getEffectiveBundleConfig(conf);
Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern), srcPattern);
if (srcs.length > 0) {
- LOG.info("Distributed copying {} files using at most {} tasks", srcs.length, maxDistributedCopyTasks);
- // Once https://issues.apache.org/jira/browse/HADOOP-15281 is available, we can use the direct write
- // distcp optimization if the target path is in S3
- DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
- options.setMaxMaps(maxDistributedCopyTasks);
- options.setOverwrite(true);
- options.setBlocking(true);
-
- Configuration distCpConf = new Configuration(conf);
- // Remove unnecessary and problematic properties from the DistCp configuration. This is necessary since
- // files referenced by these properties may have already been deleted when the DistCp is being started.
- distCpConf.unset("mapreduce.job.cache.files");
- distCpConf.unset("mapreduce.job.classpath.files");
- distCpConf.unset("tmpjars");
-
try {
- DistCp distCp = new DistCp(distCpConf, options);
+ DistCp distCp = createDistCp(srcs, maxTasks, maxBandwidthMB, dstFsConf);
if (!distCp.execute().isSuccessful()) {
throw new CrunchRuntimeException("Distributed copy failed from " + srcPattern + " to " + path);
}
@@ -329,7 +320,38 @@
}
return new Path(dir, outputFilename);
}
-
+
+ protected DistCp createDistCp(Path[] srcs, int maxTasks, int maxBandwidthMB, Configuration conf) throws Exception {
+ LOG.info("Distributed copying {} files using at most {} tasks and bandwidth limit of {} MB/s per task",
+ new Object[]{srcs.length, maxTasks, maxBandwidthMB});
+
+ Configuration distCpConf = new Configuration(conf);
+
+ // Remove unnecessary and problematic properties from the DistCp configuration. This is necessary since
+ // files referenced by these properties may have already been deleted when the DistCp is being started.
+ distCpConf.unset("mapreduce.job.cache.files");
+ distCpConf.unset("mapreduce.job.classpath.files");
+ distCpConf.unset("tmpjars");
+
+ // Setup renaming for part files
+ List<String> renames = Lists.newArrayList();
+ for (Path s : srcs) {
+ Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
+ renames.add(s.getName() + ":" + d.getName());
+ }
+ distCpConf.setStrings(CrunchRenameCopyListing.DISTCP_PATH_RENAMES, renames.toArray(new String[renames.size()]));
+ distCpConf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS, CrunchRenameCopyListing.class, CopyListing.class);
+
+ // Once https://issues.apache.org/jira/browse/HADOOP-15281 is available, we can use the direct write
+ // distcp optimization if the target path is in S3
+ DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
+ options.setMaxMaps(maxTasks);
+ options.setMapBandwidth(maxBandwidthMB);
+ options.setBlocking(true);
+
+ return new DistCp(distCpConf, options);
+ }
+
/**
* Extract the partition number from a raw reducer output filename.
*
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/CrunchRenameCopyListing.java b/crunch-core/src/main/java/org/apache/crunch/util/CrunchRenameCopyListing.java
new file mode 100644
index 0000000..b930beb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/CrunchRenameCopyListing.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the
+ * Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.SimpleCopyListing;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+ * A custom {@link CopyListing} implementation capable of dynamically renaming
+ * the target paths according to a {@link #DISTCP_PATH_RENAMES configured set of values}.
+ * <p>
+ * Once https://issues.apache.org/jira/browse/HADOOP-16147 is available, this
+ * class can be significantly simplified.
+ * </p>
+ */
+public class CrunchRenameCopyListing extends SimpleCopyListing {
+ /**
+ * Comma-separated list of original-file:renamed-file path rename pairs.
+ */
+ public static final String DISTCP_PATH_RENAMES = "crunch.distcp.path.renames";
+
+ private static final Logger LOG = LoggerFactory.getLogger(CrunchRenameCopyListing.class);
+ private final Map<String, String> pathRenames;
+
+ private long totalPaths = 0;
+ private long totalBytesToCopy = 0;
+
+ /**
+ * Constructor, to initialize configuration.
+ *
+ * @param configuration The input configuration, with which the source/target FileSystems may be accessed.
+ * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+ * delegation token caching is skipped
+ */
+ public CrunchRenameCopyListing(Configuration configuration, Credentials credentials) {
+ super(configuration, credentials);
+
+ pathRenames = new HashMap<>();
+
+ String[] pathRenameConf = configuration.getStrings(DISTCP_PATH_RENAMES);
+ if (pathRenameConf == null) {
+ throw new IllegalArgumentException("Missing required configuration: " + DISTCP_PATH_RENAMES);
+ }
+ for (String pathRename : pathRenameConf) {
+ String[] pathRenameParts = pathRename.split(":");
+ if (pathRenameParts.length != 2) {
+ throw new IllegalArgumentException("Invalid path rename format: " + pathRename);
+ }
+ if (pathRenames.put(pathRenameParts[0], pathRenameParts[1]) != null) {
+ throw new IllegalArgumentException("Invalid duplicate path rename: " + pathRenameParts[0]);
+ }
+ }
+ LOG.info("Loaded {} path rename entries", pathRenames.size());
+
+ // Clear out the rename configuration property, as it is no longer needed
+ configuration.unset(DISTCP_PATH_RENAMES);
+ }
+
+ @Override
+ public void doBuildListing(SequenceFile.Writer fileListWriter, DistCpOptions options) throws IOException {
+ try {
+ for (Path path : options.getSourcePaths()) {
+ FileSystem sourceFS = path.getFileSystem(getConf());
+ final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
+ final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
+ final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
+ path = makeQualified(path);
+
+ FileStatus rootStatus = sourceFS.getFileStatus(path);
+ Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
+
+ FileStatus[] sourceFiles = sourceFS.listStatus(path);
+ boolean explore = (sourceFiles != null && sourceFiles.length > 0);
+ if (!explore || rootStatus.isDirectory()) {
+ CopyListingFileStatus rootCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, preserveAcls,
+ preserveXAttrs, preserveRawXAttrs);
+ writeToFileListingRoot(fileListWriter, rootCopyListingStatus, sourcePathRoot, options);
+ }
+ if (explore) {
+ for (FileStatus sourceStatus : sourceFiles) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
+ }
+ CopyListingFileStatus sourceCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
+ preserveAcls && sourceStatus.isDirectory(), preserveXAttrs && sourceStatus.isDirectory(),
+ preserveRawXAttrs && sourceStatus.isDirectory());
+ writeToFileListing(fileListWriter, sourceCopyListingStatus, sourcePathRoot, options);
+
+ if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
+ }
+ traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, options);
+ }
+ }
+ }
+ }
+ fileListWriter.close();
+ fileListWriter = null;
+ } finally {
+ if (fileListWriter != null) {
+ try {
+ fileListWriter.close();
+ } catch(IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception in closing {}", fileListWriter, e);
+ }
+ }
+ }
+ }
+ }
+
+ private Path computeSourceRootPath(FileStatus sourceStatus, DistCpOptions options) throws IOException {
+ Path target = options.getTargetPath();
+ FileSystem targetFS = target.getFileSystem(getConf());
+ final boolean targetPathExists = options.getTargetPathExists();
+
+ boolean solitaryFile = options.getSourcePaths().size() == 1 && !sourceStatus.isDirectory();
+
+ if (solitaryFile) {
+ if (targetFS.isFile(target) || !targetPathExists) {
+ return sourceStatus.getPath();
+ } else {
+ return sourceStatus.getPath().getParent();
+ }
+ } else {
+ boolean specialHandling =
+ (options.getSourcePaths().size() == 1 && !targetPathExists) || options.shouldSyncFolder() || options.shouldOverwrite();
+
+ return specialHandling && sourceStatus.isDirectory() ? sourceStatus.getPath() : sourceStatus.getPath().getParent();
+ }
+ }
+
+ private Path makeQualified(Path path) throws IOException {
+ final FileSystem fs = path.getFileSystem(getConf());
+ return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ }
+
+ private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem, FileStatus fileStatus) throws IOException {
+ return fileStatus.isDirectory() && getChildren(fileSystem, fileStatus).length > 0;
+ }
+
+ private static FileStatus[] getChildren(FileSystem fileSystem, FileStatus parent) throws IOException {
+ return fileSystem.listStatus(parent.getPath());
+ }
+
+ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter, FileStatus sourceStatus, Path sourcePathRoot,
+ DistCpOptions options) throws IOException {
+ FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
+ final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
+ final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
+ final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
+ Stack<FileStatus> pathStack = new Stack<>();
+ pathStack.push(sourceStatus);
+
+ while (!pathStack.isEmpty()) {
+ for (FileStatus child : getChildren(sourceFS, pathStack.pop())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
+ }
+ CopyListingFileStatus childCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+ preserveAcls && child.isDirectory(), preserveXAttrs && child.isDirectory(), preserveRawXattrs && child.isDirectory());
+ writeToFileListing(fileListWriter, childCopyListingStatus, sourcePathRoot, options);
+ if (isDirectoryAndNotEmpty(sourceFS, child)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
+ }
+ pathStack.push(child);
+ }
+ }
+ }
+ }
+
+ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, Path sourcePathRoot,
+ DistCpOptions options) throws IOException {
+ boolean syncOrOverwrite = options.shouldSyncFolder() || options.shouldOverwrite();
+ if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDirectory() && syncOrOverwrite) {
+ // Skip the root-paths when syncOrOverwrite
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip {}", fileStatus.getPath());
+ }
+ return;
+ }
+ writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, options);
+ }
+
+ private void writeToFileListing(SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, Path sourcePathRoot,
+ DistCpOptions options) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("REL PATH: {}, FULL PATH: {}",
+ DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()), fileStatus.getPath());
+ }
+
+ if (!shouldCopy(fileStatus.getPath(), options)) {
+ return;
+ }
+
+ fileListWriter.append(getFileListingKey(sourcePathRoot, fileStatus),
+ getFileListingValue(fileStatus));
+ fileListWriter.sync();
+
+ if (!fileStatus.isDirectory()) {
+ totalBytesToCopy += fileStatus.getLen();
+ }
+ totalPaths++;
+ }
+
+ /**
+ * Returns the key for an entry in the copy listing sequence file
+ * @param sourcePathRoot the root source path for determining the relative target path
+ * @param fileStatus the copy listing file status
+ * @return the key for the sequence file entry
+ */
+ protected Text getFileListingKey(Path sourcePathRoot, CopyListingFileStatus fileStatus) {
+ Path fileStatusPath = fileStatus.getPath();
+ String pathName = fileStatusPath.getName();
+ String renamedPathName = pathRenames.get(pathName);
+
+ if (renamedPathName != null && !pathName.equals(renamedPathName)) {
+ LOG.info("Applying dynamic rename of {} to {}", pathName, renamedPathName);
+ fileStatusPath = new Path(fileStatusPath.getParent(), renamedPathName);
+ }
+ return new Text(DistCpUtils.getRelativePath(sourcePathRoot, fileStatusPath));
+ }
+
+ /**
+ * Returns the value for an entry in the copy listing sequence file
+ * @param fileStatus the copy listing file status
+ * @return the value for the sequence file entry
+ */
+ protected CopyListingFileStatus getFileListingValue(CopyListingFileStatus fileStatus) {
+ return fileStatus;
+ }
+
+ @Override
+ protected long getBytesToCopy() {
+ return totalBytesToCopy;
+ }
+
+ @Override
+ protected long getNumberOfPaths() {
+ return totalPaths;
+ }
+}
\ No newline at end of file
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index 420c9dd..b1ce5ba 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -17,16 +17,12 @@
*/
package org.apache.crunch.io.hbase;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.SequentialFileNamingScheme;
import org.apache.crunch.io.impl.FileTargetImpl;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -34,18 +30,9 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.tools.DistCp;
-import org.apache.hadoop.tools.DistCpOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
public class HFileTarget extends FileTargetImpl {
- private static final Logger LOG = LoggerFactory.getLogger(HFileTarget.class);
-
public HFileTarget(String path) {
this(new Path(path));
}
@@ -91,64 +78,6 @@
}
@Override
- public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
- FileSystem srcFs = workingPath.getFileSystem(conf);
- Path src = getSourcePattern(workingPath, index);
- Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
- FileSystem dstFs = path.getFileSystem(conf);
- if (!dstFs.exists(path)) {
- dstFs.mkdirs(path);
- }
- boolean sameFs = isCompatible(srcFs, path);
-
- if (!sameFs) {
- if (srcs.length > 0) {
- int maxDistributedCopyTasks = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS, 1000);
- LOG.info(
- "Source and destination are in different file systems, performing distcp of {} files from [{}] to [{}] "
- + "using at most {} tasks",
- new Object[] { srcs.length, src, path, maxDistributedCopyTasks });
- // Once https://issues.apache.org/jira/browse/HADOOP-15281 is available, we can use the direct write
- // distcp optimization if the target path is in S3
- DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
- options.setMaxMaps(maxDistributedCopyTasks);
- options.setOverwrite(true);
- options.setBlocking(true);
-
- Configuration distCpConf = new Configuration(conf);
- // Remove unnecessary and problematic properties from the DistCp configuration. This is necessary since
- // files referenced by these properties may have already been deleted when the DistCp is being started.
- distCpConf.unset("mapreduce.job.cache.files");
- distCpConf.unset("mapreduce.job.classpath.files");
- distCpConf.unset("tmpjars");
-
- try {
- DistCp distCp = new DistCp(distCpConf, options);
- if (!distCp.execute().isSuccessful()) {
- throw new CrunchRuntimeException("Unable to move files through distcp from " + src + " to " + path);
- }
- LOG.info("Distributed copy completed for {} files", srcs.length);
- } catch (Exception e) {
- throw new CrunchRuntimeException("Unable to move files through distcp from " + src + " to " + path, e);
- }
- } else {
- LOG.info("No files found at [{}], not attempting to copy HFiles", src);
- }
- } else {
- LOG.info(
- "Source and destination are in the same file system, performing rename of {} files from [{}] to [{}]",
- new Object[] { srcs.length, src, path });
-
- for (Path s : srcs) {
- Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
- srcFs.rename(s, d);
- }
- }
- dstFs.create(getSuccessIndicator(), true).close();
- LOG.info("Created success indicator file");
- }
-
- @Override
public String toString() {
return "HFile(" + path + ")";
}