blob: 11d2beda5d01570dffbe22d25878888323b45862 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.RenameFilesStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.getEtag;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations.SIMULATED_FAILURE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.FAILED_TO_RENAME_PREFIX;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test renaming files with fault injection.
* This explores etag support and overwrite-on-rename semantics
* of the target FS, so some of the tests behave differently
* on different stores.
*/
public class TestRenameStageFailure extends AbstractManifestCommitterTest {
/**
* Statistic to look for.
*/
public static final String RENAME_FAILURES = OP_COMMIT_FILE_RENAME + ".failures";
private static final int FAILING_FILE_INDEX = 5;
/**
* Fault Injection.
*/
private UnreliableManifestStoreOperations failures;
/** etags returned in listing/file status operations? */
private boolean etagsSupported;
/** etags preserved through rename? */
private boolean etagsPreserved;
/** resilient commit expected? */
private boolean resilientCommit;
protected boolean isResilientCommit() {
return resilientCommit;
}
protected boolean isEtagsPreserved() {
return etagsPreserved;
}
protected boolean isEtagsSupported() {
return etagsSupported;
}
@Override
public void setup() throws Exception {
super.setup();
final FileSystem fs = getFileSystem();
final Path methodPath = methodPath();
etagsSupported = fs.hasPathCapability(methodPath,
CommonPathCapabilities.ETAGS_AVAILABLE);
etagsPreserved = fs.hasPathCapability(methodPath,
CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME);
final ManifestStoreOperations wrappedOperations = getStoreOperations();
failures
= new UnreliableManifestStoreOperations(wrappedOperations);
setStoreOperations(failures);
resilientCommit = wrappedOperations.storeSupportsResilientCommit();
}
/**
* Does this test suite require rename resilience in the store/FS?
* @return true if the store operations are resilient.
*/
protected boolean requireRenameResilience() throws IOException {
return false;
}
@Test
public void testResilienceAsExpected() throws Throwable {
Assertions.assertThat(isResilientCommit())
.describedAs("resilient commit support")
.isEqualTo(requireRenameResilience());
}
@Test
public void testRenameSourceException() throws Throwable {
describe("rename fails raising an IOE -expect stage to fail" +
" and exception message preserved");
// destination directory.
Path destDir = methodPath();
StageConfig stageConfig = createStageConfigForJob(JOB1, destDir);
Path jobAttemptTaskSubDir = stageConfig.getJobAttemptTaskSubDir();
// create a manifest with a lot of files, but for
// which one of whose renames will fail
TaskManifest manifest = new TaskManifest();
createFileset(destDir, jobAttemptTaskSubDir, manifest, filesToCreate());
final List<FileEntry> filesToCommit = manifest.getFilesToCommit();
final FileEntry entry = filesToCommit.get(FAILING_FILE_INDEX);
failures.addRenameSourceFilesToFail(entry.getSourcePath());
// rename MUST fail
expectRenameFailure(
new RenameFilesStage(stageConfig),
manifest,
filesToCommit.size(),
SIMULATED_FAILURE,
PathIOException.class);
}
/**
* Number of files to create; must be more than
* {@link #FAILING_FILE_INDEX}.
*/
protected int filesToCreate() {
return 100;
}
@Test
public void testCommitMissingFile() throws Throwable {
describe("commit a file which doesn't exist. Expect FNFE always");
// destination directory.
Path destDir = methodPath();
StageConfig stageConfig = createStageConfigForJob(JOB1, destDir);
Path jobAttemptTaskSubDir = stageConfig.getJobAttemptTaskSubDir();
TaskManifest manifest = new TaskManifest();
final List<FileEntry> filesToCommit = manifest.getFilesToCommit();
Path source = new Path(jobAttemptTaskSubDir, "source.parquet");
Path dest = new Path(destDir, "destdir.parquet");
filesToCommit.add(new FileEntry(source, dest, 0, null));
final FileNotFoundException ex = expectRenameFailure(
new RenameFilesStage(stageConfig),
manifest,
0,
"",
FileNotFoundException.class);
LOG.info("Exception raised: {}", ex.toString());
}
/**
* Verify that when a job is configured to delete target paths,
* renaming will overwrite them.
* This test has to use FileSystem contract settings to determine
* whether or not the FS will actually permit file-over-file rename.
* As POSIX does, local filesystem tests will not fail if the
* destination exists.
* As ABFS and GCS do reject it, they are required to fail the
* first rename sequence, but succeed once delete.target.paths
* is true.
*/
@Test
public void testDeleteTargetPaths() throws Throwable {
describe("Verify that target path deletion works");
// destination directory.
Path destDir = methodPath();
StageConfig stageConfig = createStageConfigForJob(JOB1, destDir)
.withDeleteTargetPaths(true);
Path jobAttemptTaskSubDir = stageConfig.getJobAttemptTaskSubDir();
final Path source = new Path(jobAttemptTaskSubDir, "source.txt");
final Path dest = new Path(destDir, "source.txt");
final byte[] sourceData = "data".getBytes(StandardCharsets.UTF_8);
final FileSystem fs = getFileSystem();
ContractTestUtils.createFile(fs, source, false, sourceData);
touch(fs, dest);
TaskManifest manifest = new TaskManifest();
final FileEntry entry = createEntryWithEtag(source, dest);
manifest.addFileToCommit(entry);
List<TaskManifest> manifests = new ArrayList<>();
manifests.add(manifest);
// local POSIX filesystems allow rename of file onto file, so
// don't fail on the rename.
boolean renameOverwritesDest = isSupported(RENAME_OVERWRITES_DEST);
if (!renameOverwritesDest) {
// HDFS, ABFS and GCS do all reject rename of file onto file.
// ABFS will use its rename operation so will even raise a
// meaningful exception here.
final IOException ex = expectRenameFailure(
new RenameFilesStage(stageConfig.withDeleteTargetPaths(false)),
manifest,
0,
"",
IOException.class);
LOG.info("Exception raised: {}", ex.toString());
}
// delete target paths and it works
new RenameFilesStage(stageConfig.withDeleteTargetPaths(true))
.apply(Pair.of(manifests, Collections.emptySet()));
// and the new data made it over
verifyFileContents(fs, dest, sourceData);
// lets check the etag too, for completeness
if (isEtagsPreserved()) {
Assertions.assertThat(getEtag(fs.getFileStatus(dest)))
.describedAs("Etag of destination file %s", dest)
.isEqualTo(entry.getEtag());
}
}
@Test
public void testRenameReturnsFalse() throws Throwable {
describe("commit where rename() returns false for one file." +
" Expect failure to be escalated to an IOE");
Assume.assumeTrue("not used when resilient commits are available",
!resilientCommit);
// destination directory.
Path destDir = methodPath();
StageConfig stageConfig = createStageConfigForJob(JOB1, destDir);
Path jobAttemptTaskSubDir = stageConfig.getJobAttemptTaskSubDir();
// create a manifest with a lot of files, but for
// which one of whose renames will fail
TaskManifest manifest = new TaskManifest();
createFileset(destDir, jobAttemptTaskSubDir, manifest, filesToCreate());
final List<FileEntry> filesToCommit = manifest.getFilesToCommit();
final FileEntry entry = filesToCommit.get(FAILING_FILE_INDEX);
failures.addRenameSourceFilesToFail(entry.getSourcePath());
// switch to rename returning false.; again, this must
// be escalated to a failure.
failures.setRenameToFailWithException(false);
expectRenameFailure(
new RenameFilesStage(stageConfig),
manifest,
filesToCommit.size(),
FAILED_TO_RENAME_PREFIX,
PathIOException.class);
}
/**
* Create the source files for a task.
* @param destDir destination directory
* @param taskAttemptDir directory of the task attempt
* @param manifest manifest to update.
* @param fileCount how many files.
*/
private void createFileset(
final Path destDir,
final Path taskAttemptDir,
final TaskManifest manifest,
final int fileCount) throws IOException {
final FileSystem fs = getFileSystem();
for (int i = 0; i < fileCount; i++) {
String name = String.format("file%04d", i);
Path src = new Path(taskAttemptDir, name);
Path dest = new Path(destDir, name);
touch(fs, src);
final FileEntry entry = createEntryWithEtag(src, dest);
manifest.addFileToCommit(entry);
}
}
/**
* Create a manifest entry, including size.
* If the FS supports etags, one is retrieved.
* @param source source
* @param dest dest
* @return entry
* @throws IOException if getFileStatus failed.
*/
private FileEntry createEntryWithEtag(final Path source,
final Path dest)
throws IOException {
final FileStatus st = getFileSystem().getFileStatus(source);
final String etag = isEtagsSupported()
? getEtag(st)
: null;
return new FileEntry(source, dest, st.getLen(), etag);
}
/**
* Execute rename, expecting a failure.
* The number of files renamed MUST be less than the value of {@code files}
* @param stage stage
* @param manifest task manifests
* @param files number of files being renamed.
* @param errorText text which must be in the exception string
* @param exceptionClass class of the exception
* @return the caught exception
* @throws Exception if anything else went wrong, or no exception was raised.
*/
private <E extends Throwable> E expectRenameFailure(
RenameFilesStage stage,
TaskManifest manifest,
int files,
String errorText,
Class<E> exceptionClass) throws Exception {
List<TaskManifest> manifests = new ArrayList<>();
manifests.add(manifest);
ProgressCounter progressCounter = getProgressCounter();
progressCounter.reset();
IOStatisticsStore iostatistics = stage.getIOStatistics();
long failures0 = iostatistics.counters().get(RENAME_FAILURES);
// rename MUST raise an exception.
E ex = intercept(exceptionClass, errorText, () ->
stage.apply(Pair.of(manifests, Collections.emptySet())));
LOG.info("Statistics {}", ioStatisticsToPrettyString(iostatistics));
// the IOStatistics record the rename as a failure.
assertThatStatisticCounter(iostatistics, RENAME_FAILURES)
.isEqualTo(failures0 + 1);
// count of files committed MUST be less than expected.
if (files > 0) {
Assertions.assertThat(stage.getFilesCommitted())
.describedAs("Files Committed by stage")
.isNotEmpty()
.hasSizeLessThan(files);
}
// the progress counter will show that the rename did invoke it.
// there's no assertion on the actual value as it depends on
// execution time of the threads.
Assertions.assertThat(progressCounter.value())
.describedAs("Progress counter %s", progressCounter)
.isGreaterThan(0);
return ex;
}
}