blob: d764055c45bcddd447077bf7543b8dff0e264dee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.TaskPool;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Invoker.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
import static org.apache.hadoop.util.functional.RemoteIterators.cleanupRemoteIterator;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;
/**
* Committer based on the contributed work of the
* <a href="https://github.com/rdblue/s3committer">Netflix multipart committers.</a>
* <ol>
* <li>
* The working directory of each task is actually under a temporary
* path in the local filesystem; jobs write directly into it.
* </li>
* <li>
* Task Commit: list all files under the task working dir, upload
* each of them but do not commit the final operation.
* Persist the information for each pending commit into the cluster
* for enumeration and commit by the job committer.
* </li>
* <li>Task Abort: recursive delete of task working dir.</li>
* <li>Job Commit: list all pending PUTs to commit; commit them.</li>
* <li>
* Job Abort: list all pending PUTs to commit; abort them.
* Delete all task attempt directories.
* </li>
* </ol>
*
* This is the base class of the Partitioned and Directory committers.
* It does not do any conflict resolution, and is made non-abstract
* primarily for test purposes. It is not expected to be used in production.
*/
public class StagingCommitter extends AbstractS3ACommitter {
private static final Logger LOG = LoggerFactory.getLogger(
StagingCommitter.class);
/** Name: {@value}. */
public static final String NAME = "staging";
private final Path constructorOutputPath;
private final long uploadPartSize;
private final boolean uniqueFilenames;
private final FileOutputCommitter wrappedCommitter;
private ConflictResolution conflictResolution;
private String s3KeyPrefix;
/** The directory in the cluster FS for commits to go to. */
private Path commitsDirectory;
/**
* Committer for a single task attempt.
* @param outputPath final output path
* @param context task context
* @throws IOException on a failure
*/
public StagingCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
super(outputPath, context);
this.constructorOutputPath = requireNonNull(getOutputPath(), "output path");
Configuration conf = getConf();
this.uploadPartSize = conf.getLongBytes(
MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
this.uniqueFilenames = conf.getBoolean(
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
setWorkPath(buildWorkPath(context, getUUID()));
this.wrappedCommitter = createWrappedCommitter(context, conf);
setOutputPath(constructorOutputPath);
Path finalOutputPath = requireNonNull(getOutputPath(),
"Output path cannot be null");
S3AFileSystem fs = getS3AFileSystem(finalOutputPath,
context.getConfiguration(), false);
s3KeyPrefix = fs.pathToKey(finalOutputPath);
LOG.debug("{}: final output path is {}", getRole(), finalOutputPath);
// forces evaluation and caching of the resolution mode.
ConflictResolution mode = getConflictResolutionMode(getJobContext(),
fs.getConf());
LOG.debug("Conflict resolution mode: {}", mode);
}
@Override
public String getName() {
return NAME;
}
/**
* Create the wrapped committer.
* This includes customizing its options, and setting up the destination
* directory.
* @param context job/task context.
* @param conf config
* @return the inner committer
* @throws IOException on a failure
*/
protected FileOutputCommitter createWrappedCommitter(JobContext context,
Configuration conf) throws IOException {
// explicitly choose commit algorithm
initFileOutputCommitterOptions(context);
commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf,
getUUID());
return new FileOutputCommitter(commitsDirectory, context);
}
/**
* Init the context config with everything needed for the file output
* committer. In particular, this code currently only works with
* commit algorithm 1.
* @param context context to configure.
*/
protected void initFileOutputCommitterOptions(JobContext context) {
context.getConfiguration()
.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("StagingCommitter{");
sb.append(super.toString());
sb.append(", commitsDirectory=").append(commitsDirectory);
sb.append(", uniqueFilenames=").append(uniqueFilenames);
sb.append(", conflictResolution=").append(conflictResolution);
sb.append(", uploadPartSize=").append(uploadPartSize);
if (wrappedCommitter != null) {
sb.append(", wrappedCommitter=").append(wrappedCommitter);
}
sb.append('}');
return sb.toString();
}
/**
* Get the work path for a task.
* @param context job/task complex
* @param uuid UUID
* @return a path or null if the context is not of a task
* @throws IOException failure to build the path
*/
private static Path buildWorkPath(JobContext context, String uuid)
throws IOException {
if (context instanceof TaskAttemptContext) {
return taskAttemptWorkingPath((TaskAttemptContext) context, uuid);
} else {
return null;
}
}
/**
* Is this committer using unique filenames?
* @return true if unique filenames are used.
*/
public Boolean useUniqueFilenames() {
return uniqueFilenames;
}
/**
* Get the filesystem for the job attempt.
* @param context the context of the job. This is used to get the
* application attempt ID.
* @return the FS to store job attempt data.
* @throws IOException failure to create the FS.
*/
public FileSystem getJobAttemptFileSystem(JobContext context)
throws IOException {
Path p = getJobAttemptPath(context);
return p.getFileSystem(context.getConfiguration());
}
/**
* Compute the path where the output of a given job attempt will be placed.
* @param context the context of the job. This is used to get the
* application attempt ID.
* @param out the output path to place these in.
* @return the path to store job attempt data.
*/
public static Path getJobAttemptPath(JobContext context, Path out) {
return getJobAttemptPath(getAppAttemptId(context), out);
}
/**
* Compute the path where the output of a given job attempt will be placed.
* @param appAttemptId the ID of the application attempt for this job.
* @return the path to store job attempt data.
*/
private static Path getJobAttemptPath(int appAttemptId, Path out) {
return new Path(getPendingJobAttemptsPath(out),
String.valueOf(appAttemptId));
}
@Override
protected Path getJobAttemptPath(int appAttemptId) {
return new Path(getJobPath(),
String.valueOf(appAttemptId));
}
/**
* Compute the path under which all job attempts will be placed.
* @return the path to store job attempt data.
*/
protected Path getJobPath() {
return getPendingJobAttemptsPath(commitsDirectory);
}
/**
* Compute the path where the output of pending task attempts are stored.
* @param context the context of the job with pending tasks.
* @return the path where the output of pending task attempts are stored.
*/
private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
return new Path(getJobAttemptPath(context, out), TEMPORARY);
}
/**
* Compute the path where the output of a task attempt is stored until
* that task is committed.
*
* @param context the context of the task attempt.
* @param out The output path to put things in.
* @return the path where a task attempt should be stored.
*/
public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
return new Path(getPendingTaskAttemptsPath(context, out),
String.valueOf(context.getTaskAttemptID()));
}
/**
* Get the location of pending job attempts.
* @param out the base output directory.
* @return the location of pending job attempts.
*/
private static Path getPendingJobAttemptsPath(Path out) {
requireNonNull(out, "Null 'out' path");
return new Path(out, TEMPORARY);
}
/**
* Compute the path where the output of a committed task is stored until
* the entire job is committed.
* @param context the context of the task attempt
* @return the path where the output of a committed task is stored until
* the entire job is committed.
*/
public Path getCommittedTaskPath(TaskAttemptContext context) {
return getCommittedTaskPath(getAppAttemptId(context), context);
}
/**
* Validate the task attempt context; makes sure
* that the task attempt ID data is valid.
* @param context task context
*/
private static void validateContext(TaskAttemptContext context) {
requireNonNull(context, "null context");
requireNonNull(context.getTaskAttemptID(),
"null task attempt ID");
requireNonNull(context.getTaskAttemptID().getTaskID(),
"null task ID");
requireNonNull(context.getTaskAttemptID().getJobID(),
"null job ID");
}
/**
* Compute the path where the output of a committed task is stored until the
* entire job is committed for a specific application attempt.
* @param appAttemptId the ID of the application attempt to use
* @param context the context of any task.
* @return the path where the output of a committed task is stored.
*/
protected Path getCommittedTaskPath(int appAttemptId,
TaskAttemptContext context) {
validateContext(context);
return new Path(getJobAttemptPath(appAttemptId),
String.valueOf(context.getTaskAttemptID().getTaskID()));
}
@Override
public Path getTempTaskAttemptPath(TaskAttemptContext context) {
throw new UnsupportedOperationException("Unimplemented");
}
/**
* Lists the output of a task under the task attempt path. Subclasses can
* override this method to change how output files are identified.
* <p>
* This implementation lists the files that are direct children of the output
* path and filters hidden files (file names starting with '.' or '_').
* <p>
* The task attempt path is provided by
* {@link #getTaskAttemptPath(TaskAttemptContext)}
*
* @param context this task's {@link TaskAttemptContext}
* @return the output files produced by this task in the task attempt path
* @throws IOException on a failure
*/
protected List<LocatedFileStatus> getTaskOutput(TaskAttemptContext context)
throws IOException {
// get files on the local FS in the attempt path
Path attemptPath = requireNonNull(getTaskAttemptPath(context),
"No attemptPath path");
LOG.debug("Scanning {} for files to commit", attemptPath);
return toList(listAndFilter(getTaskAttemptFilesystem(context),
attemptPath, true, HIDDEN_FILE_FILTER));
}
/**
* Returns the final S3 key for a relative path. Subclasses can override this
* method to upload files to a different S3 location.
* <p>
* This implementation concatenates the relative path with the key prefix
* from the output path.
* If {@link CommitConstants#FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES} is
* set, then the task UUID is also included in the calculation
*
* @param relative the path of a file relative to the task attempt path
* @param context the JobContext or TaskAttemptContext for this job
* @return the S3 key where the file will be uploaded
*/
protected String getFinalKey(String relative, JobContext context) {
if (uniqueFilenames) {
return getS3KeyPrefix(context) + "/"
+ Paths.addUUID(relative, getUUID());
} else {
return getS3KeyPrefix(context) + "/" + relative;
}
}
/**
* Returns the final S3 location for a relative path as a Hadoop {@link Path}.
* This is a final method that calls {@link #getFinalKey(String, JobContext)}
* to determine the final location.
*
* @param relative the path of a file relative to the task attempt path
* @param context the JobContext or TaskAttemptContext for this job
* @return the S3 Path where the file will be uploaded
* @throws IOException IO problem
*/
protected final Path getFinalPath(String relative, JobContext context)
throws IOException {
return getDestS3AFS().keyToQualifiedPath(getFinalKey(relative, context));
}
/**
* Return the local work path as the destination for writing work.
* @param context the context of the task attempt.
* @return a path in the local filesystem.
*/
@Override
public Path getBaseTaskAttemptPath(TaskAttemptContext context) {
// a path on the local FS for files that will be uploaded
return getWorkPath();
}
/**
* For a job attempt path, the staging committer returns that of the
* wrapped committer.
* @param context the context of the job.
* @return a path in HDFS.
*/
@Override
public Path getJobAttemptPath(JobContext context) {
return wrappedCommitter.getJobAttemptPath(context);
}
/**
* Set up the job, including calling the same method on the
* wrapped committer.
* @param context job context
* @throws IOException IO failure.
*/
@Override
public void setupJob(JobContext context) throws IOException {
super.setupJob(context);
wrappedCommitter.setupJob(context);
}
/**
* Get the list of pending uploads for this job attempt.
* @param commitContext job context
* @return a list of pending uploads.
* @throws IOException Any IO failure
*/
@Override
protected ActiveCommit listPendingUploadsToCommit(
CommitContext commitContext)
throws IOException {
return listPendingUploads(commitContext, false);
}
/**
* Get the list of pending uploads for this job attempt, swallowing
* exceptions.
* @param commitContext commit context
* @return a list of pending uploads. If an exception was swallowed,
* then this may not match the actual set of pending operations
* @throws IOException shouldn't be raised, but retained for the compiler
*/
protected ActiveCommit listPendingUploadsToAbort(
CommitContext commitContext) throws IOException {
return listPendingUploads(commitContext, true);
}
/**
* Get the list of pending uploads for this job attempt.
* @param commitContext commit context
* @param suppressExceptions should exceptions be swallowed?
* @return a list of pending uploads. If exceptions are being swallowed,
* then this may not match the actual set of pending operations
* @throws IOException Any IO failure which wasn't swallowed.
*/
protected ActiveCommit listPendingUploads(
CommitContext commitContext, boolean suppressExceptions) throws IOException {
try (DurationInfo ignored = new DurationInfo(LOG,
"Listing pending uploads")) {
Path wrappedJobAttemptPath = getJobAttemptPath(commitContext.getJobContext());
final FileSystem attemptFS = wrappedJobAttemptPath.getFileSystem(
commitContext.getConf());
return ActiveCommit.fromStatusIterator(attemptFS,
listAndFilter(attemptFS,
wrappedJobAttemptPath, false,
HIDDEN_FILE_FILTER));
} catch (FileNotFoundException e) {
// this can mean the job was aborted early on, so don't confuse people
// with long stack traces that aren't the underlying problem.
maybeIgnore(suppressExceptions, "Pending upload directory not found", e);
} catch (IOException e) {
// unable to work with endpoint, if suppressing errors decide our actions
maybeIgnore(suppressExceptions, "Listing pending uploads", e);
}
// reached iff an IOE was caught and swallowed
return ActiveCommit.empty();
}
@Override
public void cleanupStagingDirs() {
Path workPath = getWorkPath();
if (workPath != null) {
LOG.debug("Cleaning up work path {}", workPath);
ignoreIOExceptions(LOG, "cleaning up", workPath.toString(),
() -> deleteQuietly(workPath.getFileSystem(getConf()),
workPath, true));
}
}
/**
* Staging committer cleanup includes calling wrapped committer's
* cleanup method, and removing all destination paths in the final
* filesystem.
* @param commitContext commit context
* @param suppressExceptions should exceptions be suppressed?
* @throws IOException IO failures if exceptions are not suppressed.
*/
@Override
@SuppressWarnings("deprecation")
protected void cleanup(CommitContext commitContext,
boolean suppressExceptions)
throws IOException {
maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
() -> wrappedCommitter.cleanupJob(
commitContext.getJobContext()));
maybeIgnore(suppressExceptions, "Delete destination paths",
() -> deleteDestinationPaths(
commitContext.getJobContext()));
super.cleanup(commitContext, suppressExceptions);
}
@Override
protected void abortJobInternal(CommitContext commitContext,
boolean suppressExceptions) throws IOException {
String r = getRole();
JobContext context = commitContext.getJobContext();
boolean failed = false;
try (DurationInfo d = new DurationInfo(LOG,
"%s: aborting job in state %s ", r, jobIdString(context))) {
ActiveCommit pending = listPendingUploadsToAbort(commitContext);
abortPendingUploads(commitContext,
pending, suppressExceptions, true);
} catch (FileNotFoundException e) {
// nothing to list
LOG.debug("No job directory to read uploads from");
} catch (IOException e) {
failed = true;
maybeIgnore(suppressExceptions, "aborting job", e);
} finally {
super.abortJobInternal(commitContext, failed || suppressExceptions);
}
}
/**
* Delete the working paths of a job.
* <ol>
* <li>The job attempt path</li>
* <li>{@code $dest/__temporary}</li>
* <li>the local working directory for staged files</li>
* </ol>
* Does not attempt to clean up the work of the wrapped committer.
* @param context job context
* @throws IOException IO failure
*/
protected void deleteDestinationPaths(JobContext context) throws IOException {
Path attemptPath = getJobAttemptPath(context);
ignoreIOExceptions(LOG,
"Deleting Job attempt Path", attemptPath.toString(),
() -> deleteWithWarning(
getJobAttemptFileSystem(context),
attemptPath,
true));
// delete the __temporary directory. This will cause problems
// if there is >1 task targeting the same dest dir
deleteWithWarning(getDestFS(),
new Path(getOutputPath(), TEMPORARY),
true);
// and the working path
deleteTaskWorkingPathQuietly(context);
}
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
Path taskAttemptPath = getTaskAttemptPath(context);
try (DurationInfo d = new DurationInfo(LOG,
"%s: setup task attempt path %s ", getRole(), taskAttemptPath)) {
super.setupTask(context);
wrappedCommitter.setupTask(context);
}
}
@Override
public boolean needsTaskCommit(TaskAttemptContext context)
throws IOException {
try (DurationInfo d = new DurationInfo(LOG,
"%s: needsTaskCommit() Task %s",
getRole(), context.getTaskAttemptID())) {
// check for files on the local FS in the attempt path
Path attemptPath = getTaskAttemptPath(context);
FileSystem fs = getTaskAttemptFilesystem(context);
// This could be made more efficient with a probe "hasChildren(Path)"
// which returns true if there is >1 entry under a given path.
FileStatus[] stats = fs.listStatus(attemptPath);
LOG.debug("{} files to commit under {}", stats.length, attemptPath);
return stats.length > 0;
} catch (FileNotFoundException e) {
// list didn't find a directory, so nothing to commit
// TODO: throw this up as an error?
LOG.info("No files to commit");
throw e;
}
}
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
try (DurationInfo d = new DurationInfo(LOG,
"%s: commit task %s", getRole(), context.getTaskAttemptID());
CommitContext commitContext
= initiateTaskOperation(context)) {
int count = commitTaskInternal(context, getTaskOutput(context), commitContext);
LOG.info("{}: upload file count: {}", getRole(), count);
} catch (IOException e) {
LOG.error("{}: commit of task {} failed",
getRole(), context.getTaskAttemptID(), e);
getCommitOperations().taskCompleted(false);
throw e;
}
getCommitOperations().taskCompleted(true);
}
/**
* Commit the task by uploading all created files and then
* writing a pending entry for them.
* @param context task context
* @param taskOutput list of files from the output
* @param commitContext commit context
* @return number of uploads committed.
* @throws IOException IO Failures.
*/
protected int commitTaskInternal(final TaskAttemptContext context,
List<? extends FileStatus> taskOutput,
CommitContext commitContext)
throws IOException {
LOG.debug("{}: commitTaskInternal", getRole());
Configuration conf = context.getConfiguration();
final Path attemptPath = getTaskAttemptPath(context);
FileSystem attemptFS = getTaskAttemptFilesystem(context);
LOG.debug("{}: attempt path is {}", getRole(), attemptPath);
// add the commits file to the wrapped committer's task attempt location.
// of this method.
Path commitsAttemptPath = wrappedCommitter.getTaskAttemptPath(context);
FileSystem commitsFS = commitsAttemptPath.getFileSystem(conf);
// keep track of unfinished commits in case one fails. if something fails,
// we will try to abort the ones that had already succeeded.
int commitCount = taskOutput.size();
final Queue<SinglePendingCommit> commits = new ConcurrentLinkedQueue<>();
LOG.info("{}: uploading from staging directory to S3 {}", getRole(),
attemptPath);
LOG.info("{}: Saving pending data information to {}",
getRole(), commitsAttemptPath);
if (taskOutput.isEmpty()) {
// there is nothing to write. needsTaskCommit() should have caught
// this, so warn that there is some kind of problem in the protocol.
LOG.warn("{}: No files to commit", getRole());
} else {
boolean threw = true;
// before the uploads, report some progress
context.progress();
PendingSet pendingCommits = new PendingSet(commitCount);
pendingCommits.putExtraData(TASK_ATTEMPT_ID,
context.getTaskAttemptID().toString());
try {
TaskPool.foreach(taskOutput)
.stopOnFailure()
.suppressExceptions(false)
.executeWith(commitContext.getOuterSubmitter())
.run(stat -> {
Path path = stat.getPath();
File localFile = new File(path.toUri().getPath());
String relative = Paths.getRelativePath(attemptPath, path);
String partition = Paths.getPartition(relative);
String key = getFinalKey(relative, context);
Path destPath = getDestS3AFS().keyToQualifiedPath(key);
SinglePendingCommit commit = getCommitOperations()
.uploadFileToPendingCommit(
localFile,
destPath,
partition,
uploadPartSize,
context);
LOG.debug("{}: adding pending commit {}", getRole(), commit);
commits.add(commit);
});
for (SinglePendingCommit commit : commits) {
pendingCommits.add(commit);
}
// maybe add in the IOStatistics the thread
if (commitContext.isCollectIOStatistics()) {
pendingCommits.getIOStatistics().aggregate(
commitContext.getIOStatisticsContext()
.getIOStatistics());
}
// save the data
// overwrite any existing file, so whichever task attempt
// committed last wins.
LOG.debug("Saving {} pending commit(s)) to file {}",
pendingCommits.size(),
commitsAttemptPath);
pendingCommits.save(commitsFS, commitsAttemptPath,
commitContext.getPendingSetSerializer());
threw = false;
} finally {
if (threw) {
LOG.error(
"{}: Exception during commit process, aborting {} commit(s)",
getRole(), commits.size());
try(DurationInfo ignored = new DurationInfo(LOG,
"Aborting %s uploads", commits.size())) {
TaskPool.foreach(commits)
.suppressExceptions()
.executeWith(commitContext.getOuterSubmitter())
.run(commitContext::abortSingleCommit);
}
deleteTaskAttemptPathQuietly(context);
}
}
// always purge attempt information at this point.
Paths.clearTempFolderInfo(context.getTaskAttemptID());
}
LOG.debug("Committing wrapped task");
wrappedCommitter.commitTask(context);
LOG.debug("Cleaning up attempt dir {}", attemptPath);
attemptFS.delete(attemptPath, true);
return commits.size();
}
/**
* Abort the task.
* The API specifies that the task has not yet been committed, so there are
* no uploads that need to be cancelled.
* Accordingly just delete files on the local FS, and call abortTask in
* the wrapped committer.
* <b>Important: this may be called in the AM after a container failure.</b>
* When that occurs and the failed container was on a different host in the
* cluster, the local files will not be deleted.
* @param context task context
* @throws IOException any failure
*/
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
// the API specifies that the task has not yet been committed, so there are
// no uploads that need to be cancelled. just delete files on the local FS.
try (DurationInfo d = new DurationInfo(LOG,
"Abort task %s", context.getTaskAttemptID())) {
deleteTaskAttemptPathQuietly(context);
deleteTaskWorkingPathQuietly(context);
wrappedCommitter.abortTask(context);
} catch (IOException e) {
LOG.error("{}: exception when aborting task {}",
getRole(), context.getTaskAttemptID(), e);
throw e;
}
}
/**
* Get the work path for a task.
* @param context job/task complex
* @param uuid UUID
* @return a path
* @throws IOException failure to build the path
*/
private static Path taskAttemptWorkingPath(TaskAttemptContext context,
String uuid) throws IOException {
return getTaskAttemptPath(context,
Paths.getLocalTaskAttemptTempDir(
context.getConfiguration(),
uuid,
context.getTaskAttemptID()));
}
/**
* Delete the working path of a task; no-op if there is none, that
* is: this is a job.
* @param context job/task context
*/
protected void deleteTaskWorkingPathQuietly(JobContext context) {
ignoreIOExceptions(LOG, "Delete working path", "",
() -> {
Path path = buildWorkPath(context, getUUID());
if (path != null) {
deleteQuietly(path.getFileSystem(getConf()), path, true);
}
});
}
/**
* Get the key of the destination "directory" of the job/task.
* @param context job context
* @return key to write to
*/
private String getS3KeyPrefix(JobContext context) {
return s3KeyPrefix;
}
/**
* Returns the {@link ConflictResolution} mode for this commit.
*
* @param context the JobContext for this commit
* @param fsConf filesystem config
* @return the ConflictResolution mode
*/
public final ConflictResolution getConflictResolutionMode(
JobContext context,
Configuration fsConf) {
if (conflictResolution == null) {
this.conflictResolution = ConflictResolution.valueOf(
getConfictModeOption(context, fsConf, DEFAULT_CONFLICT_MODE));
}
return conflictResolution;
}
/**
* Generate a {@link PathExistsException} because the destination exists.
* Lists some of the child entries first, to help diagnose the problem.
* @param path path which exists
* @param description description (usually task/job ID)
* @return an exception to throw
*/
protected PathExistsException failDestinationExists(final Path path,
final String description) {
LOG.error("{}: Failing commit by job {} to write"
+ " to existing output path {}.",
description,
getJobContext().getJobID(), path);
// List the first 10 descendants, to give some details
// on what is wrong but not overload things if there are many files.
try {
int limit = 10;
RemoteIterator<LocatedFileStatus> lf
= getDestFS().listFiles(path, true);
LOG.info("Partial Directory listing");
while (limit > 0 && lf.hasNext()) {
limit--;
LocatedFileStatus status = lf.next();
LOG.info("{}: {}",
status.getPath(),
status.isDirectory()
? " dir"
: ("file size " + status.getLen() + " bytes"));
}
cleanupRemoteIterator(lf);
} catch (IOException e) {
LOG.info("Discarding exception raised when listing {}: " + e, path);
LOG.debug("stack trace ", e);
}
return new PathExistsException(path.toString(),
description + ": " + InternalCommitterConstants.E_DEST_EXISTS);
}
/**
* Get the conflict mode option string.
* @param context context with the config
* @param fsConf filesystem config
* @param defVal default value.
* @return the trimmed configuration option, upper case.
*/
public static String getConfictModeOption(JobContext context,
Configuration fsConf, String defVal) {
return getConfigurationOption(context,
fsConf,
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
defVal).toUpperCase(Locale.ENGLISH);
}
/**
* Pre-commit actions for a job.
* Loads all the pending files to verify they can be loaded
* and parsed.
* @param commitContext commit context
* @param pending pending commits
* @throws IOException any failure
*/
@Override
public void preCommitJob(
CommitContext commitContext,
final ActiveCommit pending) throws IOException {
// see if the files can be loaded.
precommitCheckPendingFiles(commitContext, pending);
}
}