blob: b794f5814df36d0263ca442d712af746c591616b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.RenameFilesStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.getEtag;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations.SIMULATED_FAILURE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.FAILED_TO_RENAME_PREFIX;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
* Test renaming files with fault injection.
* This explores etag support and overwrite-on-rename semantics
* of the target FS, so some of the tests behave differently
* on different stores.
public class TestRenameStageFailure extends AbstractManifestCommitterTest {
* Statistic to look for.
public static final String RENAME_FAILURES = OP_COMMIT_FILE_RENAME + ".failures";
private static final int FAILING_FILE_INDEX = 5;
* Fault Injection.
private UnreliableManifestStoreOperations failures;
/** etags returned in listing/file status operations? */
private boolean etagsSupported;
/** etags preserved through rename? */
private boolean etagsPreserved;
/** resilient commit expected? */
private boolean resilientCommit;
protected boolean isResilientCommit() {
return resilientCommit;
protected boolean isEtagsPreserved() {
return etagsPreserved;
protected boolean isEtagsSupported() {
return etagsSupported;
public void setup() throws Exception {
final FileSystem fs = getFileSystem();
final Path methodPath = methodPath();
etagsSupported = fs.hasPathCapability(methodPath,
etagsPreserved = fs.hasPathCapability(methodPath,
final ManifestStoreOperations wrappedOperations = getStoreOperations();
= new UnreliableManifestStoreOperations(wrappedOperations);
resilientCommit = wrappedOperations.storeSupportsResilientCommit();
* Does this test suite require rename resilience in the store/FS?
* @return true if the store operations are resilient.
protected boolean requireRenameResilience() {
return false;
public void testResilienceAsExpected() throws Throwable {
.describedAs("resilient commit support")
public void testRenameSourceException() throws Throwable {
describe("rename fails raising an IOE -expect stage to fail" +
" and exception message preserved");
// destination directory.
Path destDir = methodPath();
StageConfig stageConfig = createStageConfigForJob(JOB1, destDir);
Path jobAttemptTaskSubDir = stageConfig.getJobAttemptTaskSubDir();
// create a manifest with a lot of files, but for
// which one of whose renames will fail
TaskManifest manifest = new TaskManifest();
createFileset(destDir, jobAttemptTaskSubDir, manifest, filesToCreate());
final List<FileEntry> filesToCommit = manifest.getFilesToCommit();
final FileEntry entry = filesToCommit.get(FAILING_FILE_INDEX);
// rename MUST fail
new RenameFilesStage(stageConfig),
* Number of files to create; must be more than
protected int filesToCreate() {
return 100;
public void testCommitMissingFile() throws Throwable {
describe("commit a file which doesn't exist. Expect FNFE always");
// destination directory.
Path destDir = methodPath();
StageConfig stageConfig = createStageConfigForJob(JOB1, destDir);
Path jobAttemptTaskSubDir = stageConfig.getJobAttemptTaskSubDir();
TaskManifest manifest = new TaskManifest();
final List<FileEntry> filesToCommit = manifest.getFilesToCommit();
Path source = new Path(jobAttemptTaskSubDir, "source.parquet");
Path dest = new Path(destDir, "destdir.parquet");
filesToCommit.add(new FileEntry(source, dest, 0, null));
final FileNotFoundException ex = expectRenameFailure(
new RenameFilesStage(stageConfig),
FileNotFoundException.class);"Exception raised: {}", ex.toString());
* Verify that when a job is configured to delete target paths,
* renaming will overwrite them.
* This test has to use FileSystem contract settings to determine
* whether or not the FS will actually permit file-over-file rename.
* As POSIX does, local filesystem tests will not fail if the
* destination exists.
* As ABFS and GCS do reject it, they are required to fail the
* first rename sequence, but succeed once
* is true.
public void testDeleteTargetPaths() throws Throwable {
describe("Verify that target path deletion works");
// destination directory.
Path destDir = methodPath();
StageConfig stageConfig = createStageConfigForJob(JOB1, destDir)
Path jobAttemptTaskSubDir = stageConfig.getJobAttemptTaskSubDir();
final Path source = new Path(jobAttemptTaskSubDir, "source.txt");
final Path dest = new Path(destDir, "source.txt");
final byte[] sourceData = "data".getBytes(StandardCharsets.UTF_8);
final FileSystem fs = getFileSystem();
ContractTestUtils.createFile(fs, source, false, sourceData);
touch(fs, dest);
TaskManifest manifest = new TaskManifest();
final FileEntry entry = createEntryWithEtag(source, dest);
List<TaskManifest> manifests = new ArrayList<>();
// local POSIX filesystems allow rename of file onto file, so
// don't fail on the rename.
boolean renameOverwritesDest = isSupported(RENAME_OVERWRITES_DEST);
if (!renameOverwritesDest) {
// HDFS, ABFS and GCS do all reject rename of file onto file.
// ABFS will use its rename operation so will even raise a
// meaningful exception here.
final IOException ex = expectRenameFailure(
new RenameFilesStage(stageConfig.withDeleteTargetPaths(false)),
IOException.class);"Exception raised: {}", ex.toString());
// delete target paths and it works
new RenameFilesStage(stageConfig.withDeleteTargetPaths(true))
.apply(Pair.of(manifests, Collections.emptySet()));
// and the new data made it over
verifyFileContents(fs, dest, sourceData);
// lets check the etag too, for completeness
if (isEtagsPreserved()) {
.describedAs("Etag of destination file %s", dest)
public void testRenameReturnsFalse() throws Throwable {
describe("commit where rename() returns false for one file." +
" Expect failure to be escalated to an IOE");
Assume.assumeTrue("not used when resilient commits are available",
// destination directory.
Path destDir = methodPath();
StageConfig stageConfig = createStageConfigForJob(JOB1, destDir);
Path jobAttemptTaskSubDir = stageConfig.getJobAttemptTaskSubDir();
// create a manifest with a lot of files, but for
// which one of whose renames will fail
TaskManifest manifest = new TaskManifest();
createFileset(destDir, jobAttemptTaskSubDir, manifest, filesToCreate());
final List<FileEntry> filesToCommit = manifest.getFilesToCommit();
final FileEntry entry = filesToCommit.get(FAILING_FILE_INDEX);
// switch to rename returning false.; again, this must
// be escalated to a failure.
new RenameFilesStage(stageConfig),
* Create the source files for a task.
* @param destDir destination directory
* @param taskAttemptDir directory of the task attempt
* @param manifest manifest to update.
* @param fileCount how many files.
private void createFileset(
final Path destDir,
final Path taskAttemptDir,
final TaskManifest manifest,
final int fileCount) throws IOException {
final FileSystem fs = getFileSystem();
for (int i = 0; i < fileCount; i++) {
String name = String.format("file%04d", i);
Path src = new Path(taskAttemptDir, name);
Path dest = new Path(destDir, name);
touch(fs, src);
final FileEntry entry = createEntryWithEtag(src, dest);
* Create a manifest entry, including size.
* If the FS supports etags, one is retrieved.
* @param source source
* @param dest dest
* @return entry
* @throws IOException if getFileStatus failed.
private FileEntry createEntryWithEtag(final Path source,
final Path dest)
throws IOException {
final FileStatus st = getFileSystem().getFileStatus(source);
final String etag = isEtagsSupported()
? getEtag(st)
: null;
return new FileEntry(source, dest, st.getLen(), etag);
* Execute rename, expecting a failure.
* The number of files renamed MUST be less than the value of {@code files}
* @param stage stage
* @param manifest task manifests
* @param files number of files being renamed.
* @param errorText text which must be in the exception string
* @param exceptionClass class of the exception
* @return the caught exception
* @throws Exception if anything else went wrong, or no exception was raised.
private <E extends Throwable> E expectRenameFailure(
RenameFilesStage stage,
TaskManifest manifest,
int files,
String errorText,
Class<E> exceptionClass) throws Exception {
List<TaskManifest> manifests = new ArrayList<>();
ProgressCounter progressCounter = getProgressCounter();
IOStatisticsStore iostatistics = stage.getIOStatistics();
long failures0 = iostatistics.counters().get(RENAME_FAILURES);
// rename MUST raise an exception.
E ex = intercept(exceptionClass, errorText, () ->
stage.apply(Pair.of(manifests, Collections.emptySet())));"Statistics {}", ioStatisticsToPrettyString(iostatistics));
// the IOStatistics record the rename as a failure.
assertThatStatisticCounter(iostatistics, RENAME_FAILURES)
.isEqualTo(failures0 + 1);
// count of files committed MUST be less than expected.
if (files > 0) {
.describedAs("Files Committed by stage")
// the progress counter will show that the rename did invoke it.
// there's no assertion on the actual value as it depends on
// execution time of the threads.
.describedAs("Progress counter %s", progressCounter)
return ex;