blob: 842d97adef1bb3a572d059d32b5dd7624f6f5d9f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.EnumSet;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.CopyReadException;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.util.StringUtils;
/**
* Mapper class that executes the DistCp copy operation.
* Implements the o.a.h.mapreduce.Mapper interface.
*/
public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> {
/**
* Hadoop counters for the DistCp CopyMapper.
* (These have been kept identical to the old DistCp,
* for backward compatibility.)
*/
public static enum Counter {
COPY, // Number of files received by the mapper for copy.
DIR_COPY, // Number of directories received by the mapper for copy.
SKIP, // Number of files skipped.
FAIL, // Number of files that failed to be copied.
BYTESCOPIED, // Number of bytes actually copied by the copy-mapper, total.
BYTESEXPECTED,// Number of bytes expected to be copied.
BYTESFAILED, // Number of bytes that failed to be copied.
BYTESSKIPPED, // Number of bytes that were skipped from copy.
}
/**
* Indicate the action for each file
*/
static enum FileAction {
SKIP, // Skip copying the file since it's already in the target FS
APPEND, // Only need to append new data to the file in the target FS
OVERWRITE, // Overwrite the whole file
}
private static Log LOG = LogFactory.getLog(CopyMapper.class);
private Configuration conf;
private boolean syncFolders = false;
private boolean ignoreFailures = false;
private boolean skipCrc = false;
private boolean overWrite = false;
private boolean append = false;
private boolean verboseLog = false;
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
private FileSystem targetFS = null;
private Path targetWorkPath = null;
/**
* Implementation of the Mapper::setup() method. This extracts the DistCp-
* options specified in the Job's configuration, to set up the Job.
* @param context Mapper's context.
* @throws IOException On IO failure.
* @throws InterruptedException If the job is interrupted.
*/
@Override
public void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
verboseLog = conf.getBoolean(
DistCpOptionSwitch.VERBOSE_LOG.getConfigLabel(), false);
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
PRESERVE_STATUS.getConfigLabel()));
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
targetFS = targetFinalPath.getFileSystem(conf);
try {
overWrite = overWrite || targetFS.getFileStatus(targetFinalPath).isFile();
} catch (FileNotFoundException ignored) {
}
if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
initializeSSLConf(context);
}
}
/**
* Initialize SSL Config if same is set in conf
*
* @throws IOException - If any
*/
private void initializeSSLConf(Context context) throws IOException {
LOG.info("Initializing SSL configuration");
String workDir = conf.get(JobContext.JOB_LOCAL_DIR) + "/work";
Path[] cacheFiles = context.getLocalCacheFiles();
Configuration sslConfig = new Configuration(false);
String sslConfFileName = conf.get(DistCpConstants.CONF_LABEL_SSL_CONF);
Path sslClient = findCacheFile(cacheFiles, sslConfFileName);
if (sslClient == null) {
LOG.warn("SSL Client config file not found. Was looking for " + sslConfFileName +
" in " + Arrays.toString(cacheFiles));
return;
}
sslConfig.addResource(sslClient);
String trustStoreFile = conf.get("ssl.client.truststore.location");
Path trustStorePath = findCacheFile(cacheFiles, trustStoreFile);
sslConfig.set("ssl.client.truststore.location", trustStorePath.toString());
String keyStoreFile = conf.get("ssl.client.keystore.location");
Path keyStorePath = findCacheFile(cacheFiles, keyStoreFile);
sslConfig.set("ssl.client.keystore.location", keyStorePath.toString());
try {
OutputStream out = new FileOutputStream(workDir + "/" + sslConfFileName);
try {
sslConfig.writeXml(out);
} finally {
out.close();
}
conf.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfFileName);
} catch (IOException e) {
LOG.warn("Unable to write out the ssl configuration. " +
"Will fall back to default ssl-client.xml in class path, if there is one", e);
}
}
/**
* Find entry from distributed cache
*
* @param cacheFiles - All localized cache files
* @param fileName - fileName to search
* @return Path of the filename if found, else null
*/
private Path findCacheFile(Path[] cacheFiles, String fileName) {
if (cacheFiles != null && cacheFiles.length > 0) {
for (Path file : cacheFiles) {
if (file.getName().equals(fileName)) {
return file;
}
}
}
return null;
}
/**
* Implementation of the Mapper::map(). Does the copy.
* @param relPath The target path.
* @param sourceFileStatus The source path.
* @throws IOException
* @throws InterruptedException
*/
@Override
public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
Context context) throws IOException, InterruptedException {
Path sourcePath = sourceFileStatus.getPath();
if (LOG.isDebugEnabled())
LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
targetFS.getWorkingDirectory()) + relPath.toString());
EnumSet<DistCpOptions.FileAttribute> fileAttributes
= getFileAttributeSettings(context);
final boolean preserveRawXattrs = context.getConfiguration().getBoolean(
DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
final String description = "Copying " + sourcePath + " to " + target;
context.setStatus(description);
LOG.info(description);
try {
CopyListingFileStatus sourceCurrStatus;
FileSystem sourceFS;
try {
sourceFS = sourcePath.getFileSystem(conf);
final boolean preserveXAttrs =
fileAttributes.contains(FileAttribute.XATTR);
sourceCurrStatus = DistCpUtils.toCopyListingFileStatusHelper(sourceFS,
sourceFS.getFileStatus(sourcePath),
fileAttributes.contains(FileAttribute.ACL),
preserveXAttrs, preserveRawXattrs,
sourceFileStatus.getChunkOffset(),
sourceFileStatus.getChunkLength());
} catch (FileNotFoundException e) {
throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
}
FileStatus targetStatus = null;
try {
targetStatus = targetFS.getFileStatus(target);
} catch (FileNotFoundException ignore) {
if (LOG.isDebugEnabled())
LOG.debug("Path could not be found: " + target, ignore);
}
if (targetStatus != null &&
(targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
throw new IOException("Can't replace " + target + ". Target is " +
getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
}
if (sourceCurrStatus.isDirectory()) {
createTargetDirsWithRetry(description, target, context);
return;
}
FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target,
targetStatus);
Path tmpTarget = target;
if (action == FileAction.SKIP) {
LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
+ " to " + target);
updateSkipCounters(context, sourceCurrStatus);
context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
if (verboseLog) {
context.write(null,
new Text("FILE_SKIPPED: source=" + sourceFileStatus.getPath()
+ ", size=" + sourceFileStatus.getLen() + " --> "
+ "target=" + target + ", size=" + (targetStatus == null ?
0 : targetStatus.getLen())));
}
} else {
if (sourceCurrStatus.isSplit()) {
tmpTarget = DistCpUtils.getSplitChunkPath(target, sourceCurrStatus);
}
if (LOG.isDebugEnabled()) {
LOG.debug("copying " + sourceCurrStatus + " " + tmpTarget);
}
copyFileWithRetry(description, sourceCurrStatus, tmpTarget,
targetStatus, context, action, fileAttributes);
}
DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget,
sourceCurrStatus, fileAttributes, preserveRawXattrs);
} catch (IOException exception) {
handleFailures(exception, sourceFileStatus, target, context);
}
}
private String getFileType(CopyListingFileStatus fileStatus) {
if (null == fileStatus) {
return "N/A";
}
return fileStatus.isDirectory() ? "dir" : "file";
}
private String getFileType(FileStatus fileStatus) {
if (null == fileStatus) {
return "N/A";
}
return fileStatus.isDirectory() ? "dir" : "file";
}
private static EnumSet<DistCpOptions.FileAttribute>
getFileAttributeSettings(Mapper.Context context) {
String attributeString = context.getConfiguration().get(
DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel());
return DistCpUtils.unpackAttributes(attributeString);
}
private void copyFileWithRetry(String description,
CopyListingFileStatus sourceFileStatus, Path target,
FileStatus targrtFileStatus, Context context, FileAction action,
EnumSet<DistCpOptions.FileAttribute> fileAttributes)
throws IOException, InterruptedException {
long bytesCopied;
try {
bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
action).execute(sourceFileStatus, target, context, fileAttributes);
} catch (Exception e) {
context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
" --> " + target, e);
}
incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
incrementCounter(context, Counter.COPY, 1);
if (verboseLog) {
context.write(null,
new Text("FILE_COPIED: source=" + sourceFileStatus.getPath() + ","
+ " size=" + sourceFileStatus.getLen() + " --> " + "target="
+ target + ", size=" + (targrtFileStatus == null ?
0 : targrtFileStatus.getLen())));
}
}
private void createTargetDirsWithRetry(String description,
Path target, Context context) throws IOException {
try {
new RetriableDirectoryCreateCommand(description).execute(target, context);
} catch (Exception e) {
throw new IOException("mkdir failed for " + target, e);
}
incrementCounter(context, Counter.DIR_COPY, 1);
}
private static void updateSkipCounters(Context context,
CopyListingFileStatus sourceFile) {
incrementCounter(context, Counter.SKIP, 1);
incrementCounter(context, Counter.BYTESSKIPPED, sourceFile.getLen());
}
private void handleFailures(IOException exception,
CopyListingFileStatus sourceFileStatus, Path target, Context context)
throws IOException, InterruptedException {
LOG.error("Failure in copying " + sourceFileStatus.getPath() +
(sourceFileStatus.isSplit()? ","
+ " offset=" + sourceFileStatus.getChunkOffset()
+ " chunkLength=" + sourceFileStatus.getChunkLength()
: "") +
" to " + target, exception);
if (ignoreFailures &&
ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) {
incrementCounter(context, Counter.FAIL, 1);
incrementCounter(context, Counter.BYTESFAILED, sourceFileStatus.getLen());
context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " - " +
StringUtils.stringifyException(exception)));
}
else
throw exception;
}
private static void incrementCounter(Context context, Counter counter,
long value) {
context.getCounter(counter).increment(value);
}
private FileAction checkUpdate(FileSystem sourceFS,
CopyListingFileStatus source, Path target, FileStatus targetFileStatus)
throws IOException {
if (targetFileStatus != null && !overWrite) {
if (canSkip(sourceFS, source, targetFileStatus)) {
return FileAction.SKIP;
} else if (append) {
long targetLen = targetFileStatus.getLen();
if (targetLen < source.getLen()) {
FileChecksum sourceChecksum = sourceFS.getFileChecksum(
source.getPath(), targetLen);
if (sourceChecksum != null
&& sourceChecksum.equals(targetFS.getFileChecksum(target))) {
// We require that the checksum is not null. Thus currently only
// DistributedFileSystem is supported
return FileAction.APPEND;
}
}
}
}
return FileAction.OVERWRITE;
}
private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source,
FileStatus target) throws IOException {
if (!syncFolders) {
return true;
}
boolean sameLength = target.getLen() == source.getLen();
boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
|| !preserve.contains(FileAttribute.BLOCKSIZE);
if (sameLength && sameBlockSize) {
return skipCrc ||
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath());
} else {
return false;
}
}
}