blob: 7ba26ff481333750e6dd98df157c6094bed28b89 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;
import org.apache.hadoop.tools.util.RetriableCommand;
import org.apache.hadoop.tools.util.ThrottledInputStream;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.DistCpOptions.*;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.*;
import java.util.EnumSet;
/**
* This class extends RetriableCommand to implement the copy of files,
* with retries on failure.
*/
public class RetriableFileCopyCommand extends RetriableCommand {
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
private static int BUFFER_SIZE = 8 * 1024;
/**
* Constructor, taking a description of the action.
* @param description Verbose description of the copy operation.
*/
public RetriableFileCopyCommand(String description) {
super(description);
}
/**
* Implementation of RetriableCommand::doExecute().
* This is the actual copy-implementation.
* @param arguments Argument-list to the command.
* @return Number of bytes copied.
* @throws Exception: CopyReadException, if there are read-failures. All other
* failures are IOExceptions.
*/
@SuppressWarnings("unchecked")
@Override
protected Object doExecute(Object... arguments) throws Exception {
assert arguments.length == 4 : "Unexpected argument list.";
FileStatus source = (FileStatus)arguments[0];
assert !source.isDirectory() : "Unexpected file-status. Expected file.";
Path target = (Path)arguments[1];
Mapper.Context context = (Mapper.Context)arguments[2];
EnumSet<FileAttribute> fileAttributes
= (EnumSet<FileAttribute>)arguments[3];
return doCopy(source, target, context, fileAttributes);
}
private long doCopy(FileStatus sourceFileStatus, Path target,
Mapper.Context context,
EnumSet<FileAttribute> fileAttributes)
throws IOException {
Path tmpTargetPath = getTmpFile(target, context);
final Configuration configuration = context.getConfiguration();
FileSystem targetFS = target.getFileSystem(configuration);
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
LOG.debug("Tmp-file path: " + tmpTargetPath);
}
FileSystem sourceFS = sourceFileStatus.getPath().getFileSystem(
configuration);
long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
context, fileAttributes);
compareFileLengths(sourceFileStatus, tmpTargetPath, configuration, bytesRead);
compareCheckSums(sourceFS, sourceFileStatus.getPath(), targetFS, tmpTargetPath);
promoteTmpToTarget(tmpTargetPath, target, targetFS);
return bytesRead;
} finally {
if (targetFS.exists(tmpTargetPath))
targetFS.delete(tmpTargetPath, false);
}
}
private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
FileStatus sourceFileStatus, Mapper.Context context,
EnumSet<FileAttribute> fileAttributes)
throws IOException {
OutputStream outStream = new BufferedOutputStream(targetFS.create(
tmpTargetPath, true, BUFFER_SIZE,
getReplicationFactor(fileAttributes, sourceFileStatus, targetFS),
getBlockSize(fileAttributes, sourceFileStatus, targetFS), context));
return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, true, context);
}
private void compareFileLengths(FileStatus sourceFileStatus, Path target,
Configuration configuration, long bytesRead)
throws IOException {
final Path sourcePath = sourceFileStatus.getPath();
FileSystem fs = sourcePath.getFileSystem(configuration);
if (fs.getFileStatus(sourcePath).getLen() != bytesRead)
throw new IOException("Mismatch in length of source:" + sourcePath
+ " and target:" + target);
}
private void compareCheckSums(FileSystem sourceFS, Path source,
FileSystem targetFS, Path target)
throws IOException {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target))
throw new IOException("Check-sum mismatch between "
+ source + " and " + target);
}
//If target file exists and unable to delete target - fail
//If target doesn't exist and unable to create parent folder - fail
//If target is successfully deleted and parent exists, if rename fails - fail
private void promoteTmpToTarget(Path tmpTarget, Path target, FileSystem fs)
throws IOException {
if ((fs.exists(target) && !fs.delete(target, false))
|| (!fs.exists(target.getParent()) && !fs.mkdirs(target.getParent()))
|| !fs.rename(tmpTarget, target)) {
throw new IOException("Failed to promote tmp-file:" + tmpTarget
+ " to: " + target);
}
}
private Path getTmpFile(Path target, Mapper.Context context) {
Path targetWorkPath = new Path(context.getConfiguration().
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
LOG.info("Creating temp file: " +
new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
}
private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
int bufferSize, boolean mustCloseStream,
Mapper.Context context) throws IOException {
Path source = sourceFileStatus.getPath();
byte buf[] = new byte[bufferSize];
ThrottledInputStream inStream = null;
long totalBytesRead = 0;
try {
inStream = getInputStream(source, context.getConfiguration());
int bytesRead = readBytes(inStream, buf);
while (bytesRead >= 0) {
totalBytesRead += bytesRead;
outStream.write(buf, 0, bytesRead);
updateContextStatus(totalBytesRead, context, sourceFileStatus);
bytesRead = inStream.read(buf);
}
} finally {
if (mustCloseStream)
IOUtils.cleanup(LOG, outStream, inStream);
}
return totalBytesRead;
}
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
FileStatus sourceFileStatus) {
StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
.format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
message.append("% ")
.append(description).append(" [")
.append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
.append('/')
.append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
.append(']');
context.setStatus(message.toString());
}
private static int readBytes(InputStream inStream, byte buf[])
throws IOException {
try {
return inStream.read(buf);
}
catch (IOException e) {
throw new CopyReadException(e);
}
}
private static ThrottledInputStream getInputStream(Path path, Configuration conf)
throws IOException {
try {
FileSystem fs = path.getFileSystem(conf);
long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
DistCpConstants.DEFAULT_BANDWIDTH_MB);
return new ThrottledInputStream(new BufferedInputStream(fs.open(path)),
bandwidthMB * 1024 * 1024);
}
catch (IOException e) {
throw new CopyReadException(e);
}
}
private static short getReplicationFactor(
EnumSet<FileAttribute> fileAttributes,
FileStatus sourceFile, FileSystem targetFS) {
return fileAttributes.contains(FileAttribute.REPLICATION)?
sourceFile.getReplication() : targetFS.getDefaultReplication();
}
private static long getBlockSize(
EnumSet<FileAttribute> fileAttributes,
FileStatus sourceFile, FileSystem targetFS) {
return fileAttributes.contains(FileAttribute.BLOCKSIZE)?
sourceFile.getBlockSize() : targetFS.getDefaultBlockSize();
}
/**
* Special subclass of IOException. This is used to distinguish read-operation
* failures from other kinds of IOExceptions.
* The failure to read from source is dealt with specially, in the CopyMapper.
* Such failures may be skipped if the DistCpOptions indicate so.
* Write failures are intolerable, and amount to CopyMapper failure.
*/
public static class CopyReadException extends IOException {
public CopyReadException(Throwable rootCause) {
super(rootCause);
}
}
}