| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3a.commit; |
| |
| import javax.annotation.Nullable; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.text.DateFormat; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.UUID; |
| |
| import com.amazonaws.services.s3.model.MultipartUpload; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.fs.audit.AuditConstants; |
| import org.apache.hadoop.fs.statistics.IOStatisticsContext; |
| import org.apache.hadoop.fs.store.audit.AuditSpan; |
| import org.apache.hadoop.fs.store.audit.AuditSpanSource; |
| import org.apache.hadoop.fs.s3a.S3AFileSystem; |
| import org.apache.hadoop.fs.s3a.commit.files.PendingSet; |
| import org.apache.hadoop.fs.s3a.commit.files.PersistentCommitData; |
| import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; |
| import org.apache.hadoop.fs.s3a.commit.files.SuccessData; |
| import org.apache.hadoop.fs.s3a.commit.impl.AuditContextUpdater; |
| import org.apache.hadoop.fs.s3a.commit.impl.CommitContext; |
| import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations; |
| import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; |
| import org.apache.hadoop.fs.statistics.IOStatistics; |
| import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; |
| import org.apache.hadoop.fs.statistics.IOStatisticsSource; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.JobStatus; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; |
| import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations; |
| import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.util.DurationInfo; |
| import org.apache.hadoop.util.functional.InvocationRaisingIOE; |
| import org.apache.hadoop.util.functional.TaskPool; |
| |
| import static java.util.Objects.requireNonNull; |
| import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAXIMUM_CONNECTIONS; |
| import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAX_TOTAL_TASKS; |
| import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS; |
| import static org.apache.hadoop.fs.s3a.Constants.MAX_TOTAL_TASKS; |
| import static org.apache.hadoop.fs.s3a.Invoker.ignoreIOExceptions; |
| import static org.apache.hadoop.fs.s3a.S3AUtils.*; |
| import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_COMMIT_JOB; |
| import static org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext; |
| import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; |
| import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; |
| import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID; |
| import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID; |
| import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE; |
| import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID; |
| import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*; |
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; |
| import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE; |
| import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createJobSummaryFilename; |
| import static org.apache.hadoop.util.functional.RemoteIterators.toList; |
| |
| /** |
| * Abstract base class for S3A committers; allows for any commonality |
| * between different architectures. |
| * |
| * Although the committer APIs allow for a committer to be created without |
| * an output path, this is not supported in this class or its subclasses: |
| * a destination must be supplied. It is left to the committer factory |
| * to handle the creation of a committer when the destination is unknown. |
| * |
| * Requiring an output directory simplifies coding and testing. |
| * |
| * The original implementation loaded all .pendingset files |
| * before attempting any commit/abort operations. |
| * While straightforward and guaranteeing that no changes were made to the |
| * destination until all files had successfully been loaded -it didn't scale; |
| * the list grew until it exceeded heap size. |
| * |
| * The second iteration builds up an {@link ActiveCommit} class with the |
| * list of .pendingset files to load and then commit; that can be done |
| * incrementally and in parallel. |
| * As a side effect of this change, unless/until changed, |
| * the commit/abort/revert of all files uploaded by a single task will be |
| * serialized. This may slow down these operations if there are many files |
| * created by a few tasks, <i>and</i> the HTTP connection pool in the S3A |
| * committer was large enough for more all the parallel POST requests. |
| */ |
| public abstract class AbstractS3ACommitter extends PathOutputCommitter |
| implements IOStatisticsSource { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(AbstractS3ACommitter.class); |
| |
| public static final String THREAD_PREFIX = "s3a-committer-pool-"; |
| |
| /** |
| * Error string when task setup fails. |
| */ |
| @VisibleForTesting |
| public static final String E_SELF_GENERATED_JOB_UUID |
| = "has a self-generated job UUID"; |
| |
| /** |
| * Unique ID for a Job. |
| * In MapReduce Jobs the YARN JobID suffices. |
| * On Spark this only be the YARN JobID |
| * it is known to be creating strongly unique IDs |
| * (i.e. SPARK-33402 is on the branch). |
| */ |
| private final String uuid; |
| |
| /** |
| * Source of the {@link #uuid} value. |
| */ |
| private final JobUUIDSource uuidSource; |
| |
| /** |
| * Has this instance been used for job setup? |
| * If so then it is safe for a locally generated |
| * UUID to be used for task setup. |
| */ |
| private boolean jobSetup; |
| |
| /** Underlying commit operations. */ |
| private final CommitOperations commitOperations; |
| |
| /** |
| * Final destination of work. |
| */ |
| private Path outputPath; |
| |
| /** |
| * Role: used in log/text messages. |
| */ |
| private final String role; |
| |
| /** |
| * This is the directory for all intermediate work: where the output format |
| * will write data. |
| * <i>This may not be on the final file system</i> |
| */ |
| private Path workPath; |
| |
| /** Configuration of the job. */ |
| private Configuration conf; |
| |
| /** Filesystem of {@link #outputPath}. */ |
| private FileSystem destFS; |
| |
| /** The job context. For a task, this can be cast to a TaskContext. */ |
| private final JobContext jobContext; |
| |
| /** Should a job marker be created? */ |
| private final boolean createJobMarker; |
| |
| private final CommitterStatistics committerStatistics; |
| |
| /** |
| * Source of Audit spans. |
| */ |
| private final AuditSpanSource auditSpanSource; |
| |
| /** |
| * Create a committer. |
| * This constructor binds the destination directory and configuration, but |
| * does not update the work path: That must be calculated by the |
| * implementation; |
| * It is omitted here to avoid subclass methods being called too early. |
| * @param outputPath the job's output path: MUST NOT be null. |
| * @param context the task's context |
| * @throws IOException on a failure |
| */ |
| protected AbstractS3ACommitter( |
| Path outputPath, |
| TaskAttemptContext context) throws IOException { |
| super(outputPath, context); |
| setOutputPath(outputPath); |
| this.jobContext = requireNonNull(context, "null job context"); |
| this.role = "Task committer " + context.getTaskAttemptID(); |
| setConf(context.getConfiguration()); |
| Pair<String, JobUUIDSource> id = buildJobUUID( |
| conf, context.getJobID()); |
| this.uuid = id.getLeft(); |
| this.uuidSource = id.getRight(); |
| LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText()); |
| initOutput(outputPath); |
| LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", |
| role, jobName(context), jobIdString(context), outputPath); |
| S3AFileSystem fs = getDestS3AFS(); |
| // set this thread's context with the job ID. |
| // audit spans created in this thread will pick |
| // up this value., including the commit operations instance |
| // soon to be created. |
| new AuditContextUpdater(jobContext) |
| .updateCurrentAuditContext(); |
| |
| // the filesystem is the span source, always. |
| this.auditSpanSource = fs.getAuditSpanSource(); |
| this.createJobMarker = context.getConfiguration().getBoolean( |
| CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, |
| DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER); |
| // the statistics are shared between this committer and its operations. |
| this.committerStatistics = fs.newCommitterStatistics(); |
| this.commitOperations = new CommitOperations(fs, committerStatistics, |
| outputPath.toString()); |
| } |
| |
| /** |
| * Init the output filesystem and path. |
| * TESTING ONLY; allows mock FS to cheat. |
| * @param out output path |
| * @throws IOException failure to create the FS. |
| */ |
| @VisibleForTesting |
| protected void initOutput(Path out) throws IOException { |
| FileSystem fs = getDestinationFS(out, getConf()); |
| setDestFS(fs); |
| setOutputPath(fs.makeQualified(out)); |
| } |
| |
| /** |
| * Get the job/task context this committer was instantiated with. |
| * @return the context. |
| */ |
| public final JobContext getJobContext() { |
| return jobContext; |
| } |
| |
| /** |
| * Final path of output, in the destination FS. |
| * @return the path |
| */ |
| @Override |
| public final Path getOutputPath() { |
| return outputPath; |
| } |
| |
| /** |
| * Set the output path. |
| * @param outputPath new value |
| */ |
| protected final void setOutputPath(Path outputPath) { |
| this.outputPath = requireNonNull(outputPath, "Null output path"); |
| } |
| |
| /** |
| * This is the critical method for {@code FileOutputFormat}; it declares |
| * the path for work. |
| * @return the working path. |
| */ |
| @Override |
| public final Path getWorkPath() { |
| return workPath; |
| } |
| |
| /** |
| * Set the work path for this committer. |
| * @param workPath the work path to use. |
| */ |
| protected final void setWorkPath(Path workPath) { |
| LOG.debug("Setting work path to {}", workPath); |
| this.workPath = workPath; |
| } |
| |
| public final Configuration getConf() { |
| return conf; |
| } |
| |
| protected final void setConf(Configuration conf) { |
| this.conf = conf; |
| } |
| |
| /** |
| * Get the destination FS, creating it on demand if needed. |
| * @return the filesystem; requires the output path to be set up |
| * @throws IOException if the FS cannot be instantiated. |
| */ |
| public FileSystem getDestFS() throws IOException { |
| if (destFS == null) { |
| FileSystem fs = getDestinationFS(outputPath, getConf()); |
| setDestFS(fs); |
| } |
| return destFS; |
| } |
| |
| /** |
| * Get the destination as an S3A Filesystem; casting it. |
| * @return the dest S3A FS. |
| * @throws IOException if the FS cannot be instantiated. |
| */ |
| public S3AFileSystem getDestS3AFS() throws IOException { |
| return (S3AFileSystem) getDestFS(); |
| } |
| |
| /** |
| * Set the destination FS: the FS of the final output. |
| * @param destFS destination FS. |
| */ |
| protected void setDestFS(FileSystem destFS) { |
| this.destFS = destFS; |
| } |
| |
| /** |
| * Compute the path where the output of a given job attempt will be placed. |
| * @param context the context of the job. This is used to get the |
| * application attempt ID. |
| * @return the path to store job attempt data. |
| */ |
| public Path getJobAttemptPath(JobContext context) { |
| return getJobAttemptPath(getAppAttemptId(context)); |
| } |
| |
| /** |
| * Compute the path under which all job attempts will be placed. |
| * @return the path to store job attempt data. |
| */ |
| protected abstract Path getJobPath(); |
| |
| /** |
| * Compute the path where the output of a given job attempt will be placed. |
| * @param appAttemptId the ID of the application attempt for this job. |
| * @return the path to store job attempt data. |
| */ |
| protected abstract Path getJobAttemptPath(int appAttemptId); |
| |
| /** |
| * Compute the path where the output of a task attempt is stored until |
| * that task is committed. This may be the normal Task attempt path |
| * or it may be a subdirectory. |
| * The default implementation returns the value of |
| * {@link #getBaseTaskAttemptPath(TaskAttemptContext)}; |
| * subclasses may return different values. |
| * @param context the context of the task attempt. |
| * @return the path where a task attempt should be stored. |
| */ |
| public Path getTaskAttemptPath(TaskAttemptContext context) { |
| return getBaseTaskAttemptPath(context); |
| } |
| |
| /** |
| * Compute the base path where the output of a task attempt is written. |
| * This is the path which will be deleted when a task is cleaned up and |
| * aborted. |
| * |
| * @param context the context of the task attempt. |
| * @return the path where a task attempt should be stored. |
| */ |
| protected abstract Path getBaseTaskAttemptPath(TaskAttemptContext context); |
| |
| /** |
| * Get a temporary directory for data. When a task is aborted/cleaned |
| * up, the contents of this directory are all deleted. |
| * @param context task context |
| * @return a path for temporary data. |
| */ |
| public abstract Path getTempTaskAttemptPath(TaskAttemptContext context); |
| |
| /** |
| * Get the name of this committer. |
| * @return the committer name. |
| */ |
| public abstract String getName(); |
| |
| /** |
| * The Job UUID, as passed in or generated. |
| * @return the UUID for the job. |
| */ |
| @VisibleForTesting |
| public final String getUUID() { |
| return uuid; |
| } |
| |
| /** |
| * Source of the UUID. |
| * @return how the job UUID was retrieved/generated. |
| */ |
| @VisibleForTesting |
| public final JobUUIDSource getUUIDSource() { |
| return uuidSource; |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder( |
| "AbstractS3ACommitter{"); |
| sb.append("role=").append(role); |
| sb.append(", name=").append(getName()); |
| sb.append(", outputPath=").append(getOutputPath()); |
| sb.append(", workPath=").append(workPath); |
| sb.append(", uuid='").append(getUUID()).append('\''); |
| sb.append(", uuid source=").append(getUUIDSource()); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| |
| /** |
| * Get the destination filesystem from the output path and the configuration. |
| * @param out output path |
| * @param config job/task config |
| * @return the associated FS |
| * @throws PathCommitException output path isn't to an S3A FS instance. |
| * @throws IOException failure to instantiate the FS. |
| */ |
| protected FileSystem getDestinationFS(Path out, Configuration config) |
| throws IOException { |
| return getS3AFileSystem(out, config, |
| requiresDelayedCommitOutputInFileSystem()); |
| } |
| |
| /** |
| * Flag to indicate whether or not the destination filesystem needs |
| * to be configured to support magic paths where the output isn't immediately |
| * visible. If the committer returns true, then committer setup will |
| * fail if the FS doesn't have the capability. |
| * Base implementation returns false. |
| * @return what the requirements of the committer are of the filesystem. |
| */ |
| protected boolean requiresDelayedCommitOutputInFileSystem() { |
| return false; |
| } |
| |
| /** |
| * Task recovery considered Unsupported: Warn and fail. |
| * @param taskContext Context of the task whose output is being recovered |
| * @throws IOException always. |
| */ |
| @Override |
| public void recoverTask(TaskAttemptContext taskContext) throws IOException { |
| LOG.warn("Cannot recover task {}", taskContext.getTaskAttemptID()); |
| throw new PathCommitException(outputPath, |
| String.format("Unable to recover task %s", |
| taskContext.getTaskAttemptID())); |
| } |
| |
| /** |
| * if the job requires a success marker on a successful job, |
| * create the file {@link CommitConstants#_SUCCESS}. |
| * |
| * While the classic committers create a 0-byte file, the S3A committers |
| * PUT up a the contents of a {@link SuccessData} file. |
| * @param commitContext commit context |
| * @param pending the pending commits |
| * |
| * @return the success data, even if the marker wasn't created |
| * |
| * @throws IOException IO failure |
| */ |
| protected SuccessData maybeCreateSuccessMarkerFromCommits( |
| final CommitContext commitContext, |
| ActiveCommit pending) throws IOException { |
| List<String> filenames = new ArrayList<>(pending.size()); |
| // The list of committed objects in pending is size limited in |
| // ActiveCommit.uploadCommitted. |
| filenames.addAll(pending.committedObjects); |
| // load in all the pending statistics |
| IOStatisticsSnapshot snapshot = new IOStatisticsSnapshot( |
| pending.getIOStatistics()); |
| // and the current statistics |
| snapshot.aggregate(getIOStatistics()); |
| |
| // and include the context statistics if enabled |
| if (commitContext.isCollectIOStatistics()) { |
| snapshot.aggregate(commitContext.getIOStatisticsContext() |
| .getIOStatistics()); |
| } |
| |
| return maybeCreateSuccessMarker(commitContext.getJobContext(), filenames, snapshot); |
| } |
| |
| /** |
| * if the job requires a success marker on a successful job, |
| * create the {@code _SUCCESS} file. |
| * |
| * While the classic committers create a 0-byte file, the S3A committers |
| * PUT up a the contents of a {@link SuccessData} file. |
| * The file is returned, even if no marker is created. |
| * This is so it can be saved to a report directory. |
| * @param context job context |
| * @param filenames list of filenames. |
| * @param ioStatistics any IO Statistics to include |
| * @throws IOException IO failure |
| * @return the success data. |
| */ |
| protected SuccessData maybeCreateSuccessMarker( |
| final JobContext context, |
| final List<String> filenames, |
| final IOStatisticsSnapshot ioStatistics) |
| throws IOException { |
| |
| SuccessData successData = |
| createSuccessData(context, filenames, ioStatistics, |
| getDestFS().getConf()); |
| if (createJobMarker) { |
| // save it to the job dest dir |
| commitOperations.createSuccessMarker(getOutputPath(), successData, true); |
| } |
| return successData; |
| } |
| |
| /** |
| * Create the success data structure from a job context. |
| * @param context job context. |
| * @param filenames short list of filenames; nullable |
| * @param ioStatistics IOStatistics snapshot |
| * @param destConf config of the dest fs, can be null |
| * @return the structure |
| * |
| */ |
| private SuccessData createSuccessData(final JobContext context, |
| final List<String> filenames, |
| final IOStatisticsSnapshot ioStatistics, |
| final Configuration destConf) { |
| // create a success data structure |
| SuccessData successData = new SuccessData(); |
| successData.setCommitter(getName()); |
| successData.setJobId(uuid); |
| successData.setJobIdSource(uuidSource.getText()); |
| successData.setDescription(getRole()); |
| successData.setHostname(NetUtils.getLocalHostname()); |
| Date now = new Date(); |
| successData.setTimestamp(now.getTime()); |
| successData.setDate(now.toString()); |
| if (filenames != null) { |
| successData.setFilenames(filenames); |
| } |
| successData.getIOStatistics().aggregate(ioStatistics); |
| // attach some config options as diagnostics to assist |
| // in debugging performance issues. |
| |
| // commit thread pool size |
| successData.addDiagnostic(FS_S3A_COMMITTER_THREADS, |
| Integer.toString(getJobCommitThreadCount(context))); |
| |
| // and filesystem http connection and thread pool sizes |
| if (destConf != null) { |
| successData.addDiagnostic(MAXIMUM_CONNECTIONS, |
| destConf.get(MAXIMUM_CONNECTIONS, |
| Integer.toString(DEFAULT_MAXIMUM_CONNECTIONS))); |
| successData.addDiagnostic(MAX_TOTAL_TASKS, |
| destConf.get(MAX_TOTAL_TASKS, |
| Integer.toString(DEFAULT_MAX_TOTAL_TASKS))); |
| } |
| return successData; |
| } |
| |
| /** |
| * Base job setup (optionally) deletes the success marker and |
| * always creates the destination directory. |
| * When objects are committed that dest dir marker will inevitably |
| * be deleted; creating it now ensures there is something at the end |
| * while the job is in progress -and if nothing is created, that |
| * it is still there. |
| * <p> |
| * The option {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID} |
| * is set to the job UUID; if generated locally |
| * {@link InternalCommitterConstants#SPARK_WRITE_UUID} is also patched. |
| * The field {@link #jobSetup} is set to true to note that |
| * this specific committer instance was used to set up a job. |
| * </p> |
| * @param context context |
| * @throws IOException IO failure |
| */ |
| |
| @Override |
| public void setupJob(JobContext context) throws IOException { |
| try (DurationInfo d = new DurationInfo(LOG, |
| "Job %s setting up", getUUID())) { |
| // record that the job has been set up |
| jobSetup = true; |
| // patch job conf with the job UUID. |
| Configuration c = context.getConfiguration(); |
| c.set(FS_S3A_COMMITTER_UUID, getUUID()); |
| c.set(FS_S3A_COMMITTER_UUID_SOURCE, getUUIDSource().getText()); |
| Path dest = getOutputPath(); |
| if (createJobMarker){ |
| commitOperations.deleteSuccessMarker(dest); |
| } |
| getDestFS().mkdirs(dest); |
| // do a scan for surplus markers |
| warnOnActiveUploads(dest); |
| } |
| } |
| |
| /** |
| * Task setup. Fails if the the UUID was generated locally, and |
| * the same committer wasn't used for job setup. |
| * {@inheritDoc} |
| * @throws PathCommitException if the task UUID options are unsatisfied. |
| */ |
| @Override |
| public void setupTask(TaskAttemptContext context) throws IOException { |
| TaskAttemptID attemptID = context.getTaskAttemptID(); |
| |
| // update the context so that task IO in the same thread has |
| // the relevant values. |
| new AuditContextUpdater(context) |
| .updateCurrentAuditContext(); |
| |
| try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s", |
| attemptID)) { |
| // reject attempts to set up the task where the output won't be |
| // picked up |
| if (!jobSetup |
| && getUUIDSource() == JobUUIDSource.GeneratedLocally) { |
| // on anything other than a test run, the context must not have been |
| // generated locally. |
| throw new PathCommitException(getOutputPath().toString(), |
| "Task attempt " + attemptID |
| + " " + E_SELF_GENERATED_JOB_UUID); |
| } |
| Path taskAttemptPath = getTaskAttemptPath(context); |
| FileSystem fs = taskAttemptPath.getFileSystem(getConf()); |
| // delete that ta path if somehow it was there |
| fs.delete(taskAttemptPath, true); |
| // create an empty directory |
| fs.mkdirs(taskAttemptPath); |
| } |
| } |
| |
| /** |
| * Get the task attempt path filesystem. This may not be the same as the |
| * final destination FS, and so may not be an S3A FS. |
| * @param context task attempt |
| * @return the filesystem |
| * @throws IOException failure to instantiate |
| */ |
| protected FileSystem getTaskAttemptFilesystem(TaskAttemptContext context) |
| throws IOException { |
| return getTaskAttemptPath(context).getFileSystem(getConf()); |
| } |
| |
| /** |
| * Commit all the pending uploads. |
| * Each file listed in the ActiveCommit instance is queued for processing |
| * in a separate thread; its contents are loaded and then (sequentially) |
| * committed. |
| * On a failure or abort of a single file's commit, all its uploads are |
| * aborted. |
| * The revert operation lists the files already committed and deletes them. |
| * @param commitContext commit context |
| * @param pending pending uploads |
| * @throws IOException on any failure |
| */ |
| protected void commitPendingUploads( |
| final CommitContext commitContext, |
| final ActiveCommit pending) throws IOException { |
| if (pending.isEmpty()) { |
| LOG.warn("{}: No pending uploads to commit", getRole()); |
| } |
| try (DurationInfo ignored = new DurationInfo(LOG, |
| "committing the output of %s task(s)", pending.size())) { |
| |
| TaskPool.foreach(pending.getSourceFiles()) |
| .stopOnFailure() |
| .suppressExceptions(false) |
| .executeWith(commitContext.getOuterSubmitter()) |
| .abortWith(status -> |
| loadAndAbort(commitContext, pending, status, true, false)) |
| .revertWith(status -> |
| loadAndRevert(commitContext, pending, status)) |
| .run(status -> |
| loadAndCommit(commitContext, pending, status)); |
| } |
| } |
| |
| /** |
| * Run a precommit check that all files are loadable. |
| * This check avoids the situation where the inability to read |
| * a file only surfaces partway through the job commit, so |
| * results in the destination being tainted. |
| * @param commitContext commit context |
| * @param pending the pending operations |
| * @throws IOException any failure |
| */ |
| protected void precommitCheckPendingFiles( |
| final CommitContext commitContext, |
| final ActiveCommit pending) throws IOException { |
| |
| FileSystem sourceFS = pending.getSourceFS(); |
| try (DurationInfo ignored = |
| new DurationInfo(LOG, "Preflight Load of pending files")) { |
| |
| TaskPool.foreach(pending.getSourceFiles()) |
| .stopOnFailure() |
| .suppressExceptions(false) |
| .executeWith(commitContext.getOuterSubmitter()) |
| .run(status -> PersistentCommitData.load(sourceFS, status, |
| commitContext.getPendingSetSerializer())); |
| } |
| } |
| |
| /** |
| * Load a pendingset file and commit all of its contents. |
| * Invoked within a parallel run; the commitContext thread |
| * pool is already busy/possibly full, so do not |
| * execute work through the same submitter. |
| * @param commitContext context to commit through |
| * @param activeCommit commit state |
| * @param status file to load |
| * @throws IOException failure |
| */ |
| private void loadAndCommit( |
| final CommitContext commitContext, |
| final ActiveCommit activeCommit, |
| final FileStatus status) throws IOException { |
| |
| final Path path = status.getPath(); |
| commitContext.switchToIOStatisticsContext(); |
| try (DurationInfo ignored = |
| new DurationInfo(LOG, |
| "Loading and committing files in pendingset %s", path)) { |
| PendingSet pendingSet = PersistentCommitData.load( |
| activeCommit.getSourceFS(), |
| status, |
| commitContext.getPendingSetSerializer()); |
| String jobId = pendingSet.getJobId(); |
| if (!StringUtils.isEmpty(jobId) && !getUUID().equals(jobId)) { |
| throw new PathCommitException(path, |
| String.format("Mismatch in Job ID (%s) and commit job ID (%s)", |
| getUUID(), jobId)); |
| } |
| TaskPool.foreach(pendingSet.getCommits()) |
| .stopOnFailure() |
| .suppressExceptions(false) |
| .executeWith(commitContext.getInnerSubmitter()) |
| .onFailure((commit, exception) -> |
| commitContext.abortSingleCommit(commit)) |
| .abortWith(commitContext::abortSingleCommit) |
| .revertWith(commitContext::revertCommit) |
| .run(commit -> { |
| commitContext.commitOrFail(commit); |
| activeCommit.uploadCommitted( |
| commit.getDestinationKey(), commit.getLength()); |
| }); |
| activeCommit.pendingsetCommitted(pendingSet.getIOStatistics()); |
| } |
| } |
| |
| /** |
| * Load a pendingset file and revert all of its contents. |
| * Invoked within a parallel run; the commitContext thread |
| * pool is already busy/possibly full, so do not |
| * execute work through the same submitter. |
| * @param commitContext context to commit through |
| * @param activeCommit commit state |
| * @param status status of file to load |
| * @throws IOException failure |
| */ |
| private void loadAndRevert( |
| final CommitContext commitContext, |
| final ActiveCommit activeCommit, |
| final FileStatus status) throws IOException { |
| |
| final Path path = status.getPath(); |
| commitContext.switchToIOStatisticsContext(); |
| try (DurationInfo ignored = |
| new DurationInfo(LOG, false, "Committing %s", path)) { |
| PendingSet pendingSet = PersistentCommitData.load( |
| activeCommit.getSourceFS(), |
| status, |
| commitContext.getPendingSetSerializer()); |
| TaskPool.foreach(pendingSet.getCommits()) |
| .suppressExceptions(true) |
| .run(commitContext::revertCommit); |
| } |
| } |
| |
| /** |
| * Load a pendingset file and abort all of its contents. |
| * Invoked within a parallel run; the commitContext thread |
| * pool is already busy/possibly full, so do not |
| * execute work through the same submitter. |
| * @param commitContext context to commit through |
| * @param activeCommit commit state |
| * @param status status of file to load |
| * @param deleteRemoteFiles should remote files be deleted? |
| * @throws IOException failure |
| */ |
| private void loadAndAbort( |
| final CommitContext commitContext, |
| final ActiveCommit activeCommit, |
| final FileStatus status, |
| final boolean suppressExceptions, |
| final boolean deleteRemoteFiles) throws IOException { |
| |
| final Path path = status.getPath(); |
| commitContext.switchToIOStatisticsContext(); |
| try (DurationInfo ignored = |
| new DurationInfo(LOG, false, "Aborting %s", path)) { |
| PendingSet pendingSet = PersistentCommitData.load( |
| activeCommit.getSourceFS(), |
| status, |
| commitContext.getPendingSetSerializer()); |
| FileSystem fs = getDestFS(); |
| TaskPool.foreach(pendingSet.getCommits()) |
| .executeWith(commitContext.getInnerSubmitter()) |
| .suppressExceptions(suppressExceptions) |
| .run(commit -> { |
| try { |
| commitContext.abortSingleCommit(commit); |
| } catch (FileNotFoundException e) { |
| // Commit ID was not known; file may exist. |
| // delete it if instructed to do so. |
| if (deleteRemoteFiles) { |
| fs.delete(commit.destinationPath(), false); |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Start the final job commit/abort commit operations. |
| * If configured to collect statistics, |
| * The IO StatisticsContext is reset. |
| * @param context job context |
| * @return a commit context through which the operations can be invoked. |
| * @throws IOException failure. |
| */ |
| protected CommitContext initiateJobOperation( |
| final JobContext context) |
| throws IOException { |
| |
| IOStatisticsContext ioStatisticsContext = |
| IOStatisticsContext.getCurrentIOStatisticsContext(); |
| CommitContext commitContext = getCommitOperations().createCommitContext( |
| context, |
| getOutputPath(), |
| getJobCommitThreadCount(context), |
| ioStatisticsContext); |
| commitContext.maybeResetIOStatisticsContext(); |
| return commitContext; |
| } |
| |
| /** |
| * Start a ask commit/abort commit operations. |
| * This may have a different thread count. |
| * If configured to collect statistics, |
| * The IO StatisticsContext is reset. |
| * @param context job or task context |
| * @return a commit context through which the operations can be invoked. |
| * @throws IOException failure. |
| */ |
| protected CommitContext initiateTaskOperation( |
| final JobContext context) |
| throws IOException { |
| |
| CommitContext commitContext = getCommitOperations().createCommitContext( |
| context, |
| getOutputPath(), |
| getTaskCommitThreadCount(context), |
| IOStatisticsContext.getCurrentIOStatisticsContext()); |
| commitContext.maybeResetIOStatisticsContext(); |
| return commitContext; |
| } |
| |
| /** |
| * Internal Job commit operation: where the S3 requests are made |
| * (potentially in parallel). |
| * @param commitContext commit context |
| * @param pending pending commits |
| * @throws IOException any failure |
| */ |
| protected void commitJobInternal( |
| final CommitContext commitContext, |
| final ActiveCommit pending) |
| throws IOException { |
| trackDurationOfInvocation(committerStatistics, |
| COMMITTER_COMMIT_JOB.getSymbol(), |
| () -> commitPendingUploads(commitContext, pending)); |
| } |
| |
| @Override |
| public void abortJob(JobContext context, JobStatus.State state) |
| throws IOException { |
| LOG.info("{}: aborting job {} in state {}", |
| getRole(), jobIdString(context), state); |
| // final cleanup operations |
| try (CommitContext commitContext = initiateJobOperation(context)){ |
| abortJobInternal(commitContext, false); |
| } |
| } |
| |
| |
| /** |
| * The internal job abort operation; can be overridden in tests. |
| * This must clean up operations; it is called when a commit fails, as |
| * well as in an {@link #abortJob(JobContext, JobStatus.State)} call. |
| * The base implementation calls {@link #cleanup(CommitContext, boolean)} |
| * so cleans up the filesystems and destroys the thread pool. |
| * Subclasses must always invoke this superclass method after their |
| * own operations. |
| * Creates and closes its own commit context. |
| * |
| * @param commitContext commit context |
| * @param suppressExceptions should exceptions be suppressed? |
| * @throws IOException any IO problem raised when suppressExceptions is false. |
| */ |
| protected void abortJobInternal(CommitContext commitContext, |
| boolean suppressExceptions) |
| throws IOException { |
| cleanup(commitContext, suppressExceptions); |
| } |
| |
| /** |
| * Abort all pending uploads to the destination directory during |
| * job cleanup operations. |
| * Note: this instantiates the thread pool if required -so |
| * @param suppressExceptions should exceptions be suppressed |
| * @param commitContext commit context |
| * @throws IOException IO problem |
| */ |
| protected void abortPendingUploadsInCleanup( |
| boolean suppressExceptions, |
| CommitContext commitContext) throws IOException { |
| // return early if aborting is disabled. |
| if (!shouldAbortUploadsInCleanup()) { |
| LOG.debug("Not cleanup up pending uploads to {} as {} is false ", |
| getOutputPath(), |
| FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS); |
| return; |
| } |
| Path dest = getOutputPath(); |
| try (DurationInfo ignored = |
| new DurationInfo(LOG, "Aborting all pending commits under %s", |
| dest)) { |
| CommitOperations ops = getCommitOperations(); |
| List<MultipartUpload> pending; |
| try { |
| pending = ops.listPendingUploadsUnderPath(dest); |
| } catch (IOException e) { |
| // Swallow any errors given this is best effort |
| LOG.debug("Failed to list pending uploads under {}", dest, e); |
| return; |
| } |
| if (!pending.isEmpty()) { |
| LOG.warn("{} pending uploads were found -aborting", pending.size()); |
| LOG.warn("If other tasks/jobs are writing to {}," |
| + "this action may cause them to fail", dest); |
| TaskPool.foreach(pending) |
| .executeWith(commitContext.getOuterSubmitter()) |
| .suppressExceptions(suppressExceptions) |
| .run(u -> commitContext.abortMultipartCommit( |
| u.getKey(), u.getUploadId())); |
| } else { |
| LOG.info("No pending uploads were found"); |
| } |
| } |
| } |
| |
| private boolean shouldAbortUploadsInCleanup() { |
| return getConf() |
| .getBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, |
| DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS); |
| } |
| |
| /** |
| * Subclass-specific pre-Job-commit actions. |
| * The staging committers all load the pending files to verify that |
| * they can be loaded. |
| * The Magic committer does not, because of the overhead of reading files |
| * from S3 makes it too expensive. |
| * @param commitContext commit context |
| * @param pending the pending operations |
| * @throws IOException any failure |
| */ |
| @VisibleForTesting |
| public void preCommitJob(CommitContext commitContext, |
| ActiveCommit pending) throws IOException { |
| } |
| |
| /** |
| * Commit work. |
| * This consists of two stages: precommit and commit. |
| * <p> |
| * Precommit: identify pending uploads, then allow subclasses |
| * to validate the state of the destination and the pending uploads. |
| * Any failure here triggers an abort of all pending uploads. |
| * <p> |
| * Commit internal: do the final commit sequence. |
| * <p> |
| * The final commit action is to build the {@code _SUCCESS} file entry. |
| * </p> |
| * @param context job context |
| * @throws IOException any failure |
| */ |
| @Override |
| public void commitJob(JobContext context) throws IOException { |
| String id = jobIdString(context); |
| // the commit context is created outside a try-with-resources block |
| // so it can be used in exception handling. |
| CommitContext commitContext = null; |
| |
| SuccessData successData = null; |
| IOException failure = null; |
| String stage = "preparing"; |
| try (DurationInfo d = new DurationInfo(LOG, |
| "%s: commitJob(%s)", getRole(), id)) { |
| commitContext = initiateJobOperation(context); |
| ActiveCommit pending |
| = listPendingUploadsToCommit(commitContext); |
| stage = "precommit"; |
| preCommitJob(commitContext, pending); |
| stage = "commit"; |
| commitJobInternal(commitContext, pending); |
| stage = "completed"; |
| jobCompleted(true); |
| stage = "marker"; |
| successData = maybeCreateSuccessMarkerFromCommits(commitContext, pending); |
| stage = "cleanup"; |
| cleanup(commitContext, false); |
| } catch (IOException e) { |
| // failure. record it for the summary |
| failure = e; |
| LOG.warn("Commit failure for job {}", id, e); |
| jobCompleted(false); |
| abortJobInternal(commitContext, true); |
| throw e; |
| } finally { |
| // save the report summary, even on failure |
| if (commitContext != null) { |
| if (successData == null) { |
| // if the commit did not get as far as creating success data, create one. |
| successData = createSuccessData(context, null, null, |
| getDestFS().getConf()); |
| } |
| // save quietly, so no exceptions are raised |
| maybeSaveSummary(stage, |
| commitContext, |
| successData, |
| failure, |
| true, |
| true); |
| // and close that commit context |
| commitContext.close(); |
| } |
| } |
| |
| } |
| |
| /** |
| * Job completion outcome; this may be subclassed in tests. |
| * @param success did the job succeed. |
| */ |
| protected void jobCompleted(boolean success) { |
| getCommitOperations().jobCompleted(success); |
| } |
| |
| /** |
| * Clean up any staging directories. |
| * IOEs must be caught and swallowed. |
| */ |
| public abstract void cleanupStagingDirs(); |
| |
| /** |
| * Get the list of pending uploads for this job attempt. |
| * @param commitContext commit context |
| * @return a list of pending uploads. |
| * @throws IOException Any IO failure |
| */ |
| protected abstract ActiveCommit listPendingUploadsToCommit( |
| CommitContext commitContext) |
| throws IOException; |
| |
| /** |
| * Cleanup the job context, including aborting anything pending |
| * and destroying the thread pool. |
| * @param commitContext commit context |
| * @param suppressExceptions should exceptions be suppressed? |
| * @throws IOException any failure if exceptions were not suppressed. |
| */ |
| protected void cleanup(CommitContext commitContext, |
| boolean suppressExceptions) throws IOException { |
| try (DurationInfo d = new DurationInfo(LOG, |
| "Cleanup job %s", jobIdString(commitContext.getJobContext()))) { |
| abortPendingUploadsInCleanup(suppressExceptions, commitContext); |
| } finally { |
| cleanupStagingDirs(); |
| } |
| } |
| |
| @Override |
| @SuppressWarnings("deprecation") |
| public void cleanupJob(JobContext context) throws IOException { |
| String r = getRole(); |
| String id = jobIdString(context); |
| LOG.warn("{}: using deprecated cleanupJob call for {}", r, id); |
| try (DurationInfo d = new DurationInfo(LOG, "%s: cleanup Job %s", r, id); |
| CommitContext commitContext = initiateJobOperation(context)) { |
| cleanup(commitContext, true); |
| } |
| } |
| |
| /** |
| * Execute an operation; maybe suppress any raised IOException. |
| * @param suppress should raised IOEs be suppressed? |
| * @param action action (for logging when the IOE is supressed. |
| * @param operation operation |
| * @throws IOException if operation raised an IOE and suppress == false |
| */ |
| protected void maybeIgnore( |
| boolean suppress, |
| String action, |
| InvocationRaisingIOE operation) throws IOException { |
| if (suppress) { |
| ignoreIOExceptions(LOG, action, "", operation); |
| } else { |
| operation.apply(); |
| } |
| } |
| |
| /** |
| * Log or rethrow a caught IOException. |
| * @param suppress should raised IOEs be suppressed? |
| * @param action action (for logging when the IOE is suppressed. |
| * @param ex exception |
| * @throws IOException if suppress == false |
| */ |
| protected void maybeIgnore( |
| boolean suppress, |
| String action, |
| IOException ex) throws IOException { |
| if (suppress) { |
| LOG.debug(action, ex); |
| } else { |
| throw ex; |
| } |
| } |
| |
| /** |
| * Get the commit actions instance. |
| * Subclasses may provide a mock version of this. |
| * @return the commit actions instance to use for operations. |
| */ |
| protected CommitOperations getCommitOperations() { |
| return commitOperations; |
| } |
| |
| /** |
| * Used in logging and reporting to help disentangle messages. |
| * @return the committer's role. |
| */ |
| protected String getRole() { |
| return role; |
| } |
| |
| /** |
| * Get the thread count for this job's commit operations. |
| * @param context the JobContext for this commit |
| * @return a possibly zero thread count. |
| */ |
| private int getJobCommitThreadCount(final JobContext context) { |
| return context.getConfiguration().getInt( |
| FS_S3A_COMMITTER_THREADS, |
| DEFAULT_COMMITTER_THREADS); |
| } |
| |
| /** |
| * Get the thread count for this task's commit operations. |
| * @param context the JobContext for this commit |
| * @return a possibly zero thread count. |
| */ |
| private int getTaskCommitThreadCount(final JobContext context) { |
| return context.getConfiguration().getInt( |
| FS_S3A_COMMITTER_THREADS, |
| DEFAULT_COMMITTER_THREADS); |
| } |
| |
| /** |
| * Delete the task attempt path without raising any errors. |
| * @param context task context |
| */ |
| protected void deleteTaskAttemptPathQuietly(TaskAttemptContext context) { |
| Path attemptPath = getBaseTaskAttemptPath(context); |
| ignoreIOExceptions(LOG, "Delete task attempt path", attemptPath.toString(), |
| () -> deleteQuietly( |
| getTaskAttemptFilesystem(context), attemptPath, true)); |
| } |
| |
| /** |
| * Abort all pending uploads in the list. |
| * This operation is used by the magic committer as part of its |
| * rollback after a failure during task commit. |
| * @param commitContext commit context |
| * @param pending pending uploads |
| * @param suppressExceptions should exceptions be suppressed |
| * @throws IOException any exception raised |
| */ |
| protected void abortPendingUploads( |
| final CommitContext commitContext, |
| final List<SinglePendingCommit> pending, |
| final boolean suppressExceptions) |
| throws IOException { |
| if (pending == null || pending.isEmpty()) { |
| LOG.info("{}: no pending commits to abort", getRole()); |
| } else { |
| try (DurationInfo d = new DurationInfo(LOG, |
| "Aborting %s uploads", pending.size())) { |
| TaskPool.foreach(pending) |
| .executeWith(commitContext.getOuterSubmitter()) |
| .suppressExceptions(suppressExceptions) |
| .run(commitContext::abortSingleCommit); |
| } |
| } |
| } |
| |
| /** |
| * Abort all pending uploads in the list. |
| * @param commitContext commit context |
| * @param pending pending uploads |
| * @param suppressExceptions should exceptions be suppressed? |
| * @param deleteRemoteFiles should remote files be deleted? |
| * @throws IOException any exception raised |
| */ |
| protected void abortPendingUploads( |
| final CommitContext commitContext, |
| final ActiveCommit pending, |
| final boolean suppressExceptions, |
| final boolean deleteRemoteFiles) throws IOException { |
| if (pending.isEmpty()) { |
| LOG.info("{}: no pending commits to abort", getRole()); |
| } else { |
| try (DurationInfo d = new DurationInfo(LOG, |
| "Aborting %s uploads", pending.size())) { |
| TaskPool.foreach(pending.getSourceFiles()) |
| .executeWith(commitContext.getOuterSubmitter()) |
| .suppressExceptions(suppressExceptions) |
| .run(path -> |
| loadAndAbort(commitContext, |
| pending, |
| path, |
| suppressExceptions, |
| deleteRemoteFiles)); |
| } |
| } |
| } |
| |
| @Override |
| public IOStatistics getIOStatistics() { |
| return committerStatistics.getIOStatistics(); |
| } |
| |
| /** |
| * Scan for active uploads and list them along with a warning message. |
| * Errors are ignored. |
| * @param path output path of job. |
| */ |
| protected void warnOnActiveUploads(final Path path) { |
| List<MultipartUpload> pending; |
| try { |
| pending = getCommitOperations() |
| .listPendingUploadsUnderPath(path); |
| } catch (IOException e) { |
| LOG.debug("Failed to list uploads under {}", |
| path, e); |
| return; |
| } |
| if (!pending.isEmpty()) { |
| // log a warning |
| LOG.warn("{} active upload(s) in progress under {}", |
| pending.size(), |
| path); |
| LOG.warn("Either jobs are running concurrently" |
| + " or failed jobs are not being cleaned up"); |
| // and the paths + timestamps |
| DateFormat df = DateFormat.getDateTimeInstance(); |
| pending.forEach(u -> |
| LOG.info("[{}] {}", |
| df.format(u.getInitiated()), |
| u.getKey())); |
| if (shouldAbortUploadsInCleanup()) { |
| LOG.warn("This committer will abort these uploads in job cleanup"); |
| } |
| } |
| } |
| |
| /** |
| * Build the job UUID. |
| * |
| * <p> |
| * In MapReduce jobs, the application ID is issued by YARN, and |
| * unique across all jobs. |
| * </p> |
| * <p> |
| * Spark will use a fake app ID based on the current time. |
| * This can lead to collisions on busy clusters unless |
| * the specific spark release has SPARK-33402 applied. |
| * This appends a random long value to the timestamp, so |
| * is unique enough that the risk of collision is almost |
| * nonexistent. |
| * </p> |
| * <p> |
| * The order of selection of a uuid is |
| * </p> |
| * <ol> |
| * <li>Value of |
| * {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li> |
| * <li>Value of |
| * {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li> |
| * <li>If enabled through |
| * {@link CommitConstants#FS_S3A_COMMITTER_GENERATE_UUID}: |
| * Self-generated uuid.</li> |
| * <li>If {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID} |
| * is not set: Application ID</li> |
| * </ol> |
| * The UUID bonding takes place during construction; |
| * the staging committers use it to set up their wrapped |
| * committer to a path in the cluster FS which is unique to the |
| * job. |
| * <p> |
| * In MapReduce jobs, the application ID is issued by YARN, and |
| * unique across all jobs. |
| * </p> |
| * In {@link #setupJob(JobContext)} the job context's configuration |
| * will be patched |
| * be valid in all sequences where the job has been set up for the |
| * configuration passed in. |
| * <p> |
| * If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID} |
| * is set, then an external UUID MUST be passed in. |
| * This can be used to verify that the spark engine is reliably setting |
| * unique IDs for staging. |
| * </p> |
| * @param conf job/task configuration |
| * @param jobId job ID from YARN or spark. |
| * @return Job UUID and source of it. |
| * @throws PathCommitException no UUID was found and it was required |
| */ |
| public static Pair<String, JobUUIDSource> |
| buildJobUUID(Configuration conf, JobID jobId) |
| throws PathCommitException { |
| |
| String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, ""); |
| |
| if (!jobUUID.isEmpty()) { |
| return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty); |
| } |
| // there is no job UUID. |
| // look for one from spark |
| jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, ""); |
| if (!jobUUID.isEmpty()) { |
| return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID); |
| } |
| |
| // there is no UUID configuration in the job/task config |
| |
| // Check the job hasn't declared a requirement for the UUID. |
| // This allows or fail-fast validation of Spark behavior. |
| if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, |
| DEFAULT_S3A_COMMITTER_REQUIRE_UUID)) { |
| throw new PathCommitException("", E_NO_SPARK_UUID); |
| } |
| |
| // see if the job can generate a random UUI` |
| if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID, |
| DEFAULT_S3A_COMMITTER_GENERATE_UUID)) { |
| // generate a random UUID. This is OK for a job, for a task |
| // it means that the data may not get picked up. |
| String newId = UUID.randomUUID().toString(); |
| LOG.warn("No job ID in configuration; generating a random ID: {}", |
| newId); |
| return Pair.of(newId, JobUUIDSource.GeneratedLocally); |
| } |
| // if no other option was supplied, return the job ID. |
| // This is exactly what MR jobs expect, but is not what |
| // Spark jobs can do as there is a risk of jobID collision. |
| return Pair.of(jobId.toString(), JobUUIDSource.JobID); |
| } |
| |
| /** |
| * Enumeration of Job UUID source. |
| */ |
| public enum JobUUIDSource { |
| SparkWriteUUID(SPARK_WRITE_UUID), |
| CommitterUUIDProperty(FS_S3A_COMMITTER_UUID), |
| JobID("JobID"), |
| GeneratedLocally("Generated Locally"); |
| |
| private final String text; |
| |
| JobUUIDSource(final String text) { |
| this.text = text; |
| } |
| |
| /** |
| * Source for messages. |
| * @return text |
| */ |
| public String getText() { |
| return text; |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder( |
| "JobUUIDSource{"); |
| sb.append("text='").append(text).append('\''); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| } |
| |
| /** |
| * Add jobID to current context. |
| */ |
| protected final void updateCommonContext() { |
| currentAuditContext().put(AuditConstants.PARAM_JOB_ID, uuid); |
| |
| } |
| |
| protected AuditSpanSource getAuditSpanSource() { |
| return auditSpanSource; |
| } |
| |
| /** |
| * Start an operation; retrieve an audit span. |
| * |
| * All operation names <i>SHOULD</i> come from |
| * {@code StoreStatisticNames} or |
| * {@code StreamStatisticNames}. |
| * @param name operation name. |
| * @param path1 first path of operation |
| * @param path2 second path of operation |
| * @return a span for the audit |
| * @throws IOException failure |
| */ |
| protected AuditSpan startOperation(String name, |
| @Nullable String path1, |
| @Nullable String path2) |
| throws IOException { |
| return getAuditSpanSource().createSpan(name, path1, path2); |
| } |
| |
| |
| /** |
| * Save a summary to the report dir if the config option |
| * is set. |
| * The report will be updated with the current active stage, |
| * and if {@code thrown} is non-null, it will be added to the |
| * diagnostics (and the job tagged as a failure). |
| * Static for testability. |
| * @param activeStage active stage |
| * @param context commit context. |
| * @param report summary file. |
| * @param thrown any exception indicting failure. |
| * @param quiet should exceptions be swallowed. |
| * @param overwrite should the existing file be overwritten |
| * @return the path of a file, if successfully saved |
| * @throws IOException if a failure occured and quiet==false |
| */ |
| private static Path maybeSaveSummary( |
| String activeStage, |
| CommitContext context, |
| SuccessData report, |
| Throwable thrown, |
| boolean quiet, |
| boolean overwrite) throws IOException { |
| Configuration conf = context.getConf(); |
| String reportDir = conf.getTrimmed(OPT_SUMMARY_REPORT_DIR, ""); |
| if (reportDir.isEmpty()) { |
| LOG.debug("No summary directory set in " + OPT_SUMMARY_REPORT_DIR); |
| return null; |
| } |
| LOG.debug("Summary directory set to {}", reportDir); |
| |
| Path reportDirPath = new Path(reportDir); |
| Path path = new Path(reportDirPath, |
| createJobSummaryFilename(context.getJobId())); |
| |
| if (thrown != null) { |
| report.recordJobFailure(thrown); |
| } |
| report.putDiagnostic(STAGE, activeStage); |
| // the store operations here is explicitly created for the FS where |
| // the reports go, which may not be the target FS of the job. |
| |
| final FileSystem fs = path.getFileSystem(conf); |
| try (ManifestStoreOperations operations = new ManifestStoreOperationsThroughFileSystem( |
| fs)) { |
| if (!overwrite) { |
| // check for file existence so there is no need to worry about |
| // precisely what exception is raised when overwrite=false and dest file |
| // exists |
| try { |
| FileStatus st = operations.getFileStatus(path); |
| // get here and the file exists |
| LOG.debug("Report already exists: {}", st); |
| return null; |
| } catch (FileNotFoundException ignored) { |
| } |
| } |
| report.save(fs, path, SuccessData.serializer()); |
| LOG.info("Job summary saved to {}", path); |
| return path; |
| } catch (IOException e) { |
| LOG.debug("Failed to save summary to {}", path, e); |
| if (quiet) { |
| return null; |
| } else { |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * State of the active commit operation. |
| * |
| * It contains a list of all pendingset files to load as the source |
| * of outstanding commits to complete/abort, |
| * and tracks the files uploaded. |
| * |
| * To avoid running out of heap by loading all the source files |
| * simultaneously: |
| * <ol> |
| * <li> |
| * The list of files to load is passed round but |
| * the contents are only loaded on demand. |
| * </li> |
| * <li> |
| * The number of written files tracked for logging in |
| * the _SUCCESS file are limited to a small amount -enough |
| * for testing only. |
| * </li> |
| * </ol> |
| */ |
| public static final class ActiveCommit { |
| |
| private static final AbstractS3ACommitter.ActiveCommit EMPTY |
| = new ActiveCommit(null, new ArrayList<>()); |
| |
| /** All pendingset files to iterate through. */ |
| private final List<FileStatus> sourceFiles; |
| |
| /** |
| * Filesystem for the source files. |
| */ |
| private final FileSystem sourceFS; |
| |
| /** |
| * List of committed objects; only built up until the commit limit is |
| * reached. |
| */ |
| private final List<String> committedObjects = new ArrayList<>(); |
| |
| /** |
| * The total number of committed objects. |
| */ |
| private int committedObjectCount; |
| |
| /** |
| * Total number of bytes committed. |
| */ |
| private long committedBytes; |
| |
| /** |
| * Aggregate statistics of all supplied by |
| * committed uploads. |
| */ |
| private final IOStatisticsSnapshot ioStatistics = |
| new IOStatisticsSnapshot(); |
| |
| /** |
| * Construct from a source FS and list of files. |
| * @param sourceFS filesystem containing the list of pending files |
| * @param sourceFiles .pendingset files to load and commit. |
| */ |
| @SuppressWarnings("unchecked") |
| public ActiveCommit( |
| final FileSystem sourceFS, |
| final List<? extends FileStatus> sourceFiles) { |
| this.sourceFiles = (List<FileStatus>) sourceFiles; |
| this.sourceFS = sourceFS; |
| } |
| |
| /** |
| * Create an active commit of the given pending files. |
| * @param pendingFS source filesystem. |
| * @param statuses iterator of file status or subclass to use. |
| * @return the commit |
| * @throws IOException if the iterator raises one. |
| */ |
| public static ActiveCommit fromStatusIterator( |
| final FileSystem pendingFS, |
| final RemoteIterator<? extends FileStatus> statuses) throws IOException { |
| return new ActiveCommit(pendingFS, toList(statuses)); |
| } |
| |
| /** |
| * Get the empty entry. |
| * @return an active commit with no pending files. |
| */ |
| public static ActiveCommit empty() { |
| return EMPTY; |
| } |
| |
| public List<FileStatus> getSourceFiles() { |
| return sourceFiles; |
| } |
| |
| public FileSystem getSourceFS() { |
| return sourceFS; |
| } |
| |
| /** |
| * Note that a file was committed. |
| * Increase the counter of files and total size. |
| * If there is room in the committedFiles list, the file |
| * will be added to the list and so end up in the _SUCCESS file. |
| * @param key key of the committed object. |
| * @param size size in bytes. |
| */ |
| public synchronized void uploadCommitted(String key, |
| long size) { |
| if (committedObjects.size() < SUCCESS_MARKER_FILE_LIMIT) { |
| committedObjects.add( |
| key.startsWith("/") ? key : ("/" + key)); |
| } |
| committedObjectCount++; |
| committedBytes += size; |
| } |
| |
| /** |
| * Callback when a pendingset has been committed, |
| * including any source statistics. |
| * @param sourceStatistics any source statistics |
| */ |
| public void pendingsetCommitted(final IOStatistics sourceStatistics) { |
| ioStatistics.aggregate(sourceStatistics); |
| } |
| |
| public IOStatisticsSnapshot getIOStatistics() { |
| return ioStatistics; |
| } |
| |
| public synchronized List<String> getCommittedObjects() { |
| return committedObjects; |
| } |
| |
| public synchronized int getCommittedFileCount() { |
| return committedObjectCount; |
| } |
| |
| public synchronized long getCommittedBytes() { |
| return committedBytes; |
| } |
| |
| public int size() { |
| return sourceFiles.size(); |
| } |
| |
| public boolean isEmpty() { |
| return sourceFiles.isEmpty(); |
| } |
| |
| public void add(FileStatus status) { |
| sourceFiles.add(status); |
| } |
| } |
| } |