blob: bcd9d34e263dea65ad8409e0975088cc7ea14d9c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SaveTaskManifestStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConfig.createCloseableTaskSubmitter;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_ID_SOURCE_MAPREDUCE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_FACTORY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_VALIDATE_OUTPUT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.getProjectBuildDir;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.NAME_FORMAT_JOB_ATTEMPT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createIOStatisticsStore;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createTaskManifest;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
* Tests which work with manifest committers.
* This is a filesystem contract bound to the local filesystem;
* subclasses may change the FS to test against other stores.
* Some fields are set up in
* in {@link #executeOneTaskAttempt(int, int, int)},
* which is why fields are used.
* when synchronized access is needed; synchronize on (this) rather
* than individual fields
public abstract class AbstractManifestCommitterTest
extends AbstractFSContractTestBase {
protected static final Logger LOG =
* Some Job and task IDs.
protected static final ManifestCommitterTestSupport.JobAndTaskIDsForTests
TASK_IDS = new ManifestCommitterTestSupport.JobAndTaskIDsForTests(2, 2);
public static final int JOB1 = 1;
public static final int TASK0 = 0;
public static final int TASK1 = 1;
* Task attempt 0 index.
public static final int TA0 = 0;
* Task attempt 1 index.
public static final int TA1 = 1;
* Depth of dir tree to generate.
public static final int DEPTH = 3;
* Width of dir tree at every level.
public static final int WIDTH = 2;
* How many files to create in the leaf directories.
public static final int FILES_PER_DIRECTORY = 4;
* Pool size.
public static final int POOL_SIZE = 32;
* FileSystem statistics are collected across every test case.
protected static final IOStatisticsSnapshot FILESYSTEM_IOSTATS =
* Counter for creating files. Ensures that across all test suites,
* duplicate filenames are never created. Helps assign blame.
private static final AtomicLong CREATE_FILE_COUNTER = new AtomicLong();
protected static final byte[] NO_DATA = new byte[0];
* The thread leak tracker.
private static final ThreadLeakTracker THREAD_LEAK_TRACKER = new ThreadLeakTracker();
private static final int MAX_LEN = 64_000;
* Submitter for tasks; may be null.
private CloseableTaskPoolSubmitter submitter;
* Stage statistics. Created in test setup, and in
* teardown updates {@link #FILESYSTEM_IOSTATS}.
private IOStatisticsStore stageStatistics;
* Prefer to use these to interact with the FS to
* ensure more implicit coverage.
private ManifestStoreOperations storeOperations;
* Progress counter used in all stage configs.
private final ProgressCounter progressCounter = new ProgressCounter();
* Directory for job summary reports.
* This should be set up in test suites testing against real object stores.
private File reportDir;
* List of task attempt IDs for those tests which create manifests.
private final List<String> taskAttemptIds = new ArrayList<>();
* List of task IDs for those tests which create manifests.
private final List<String> taskIds = new ArrayList<>();
* any job stage configuration created for operations.
private StageConfig jobStageConfig;
* Destination dir of job.
private Path destDir;
* When creating manifests, total data size.
private final AtomicLong totalDataSize = new AtomicLong();
* Where to move manifests; must be in target FS.
private Path manifestDir;
* Get the contract configuration.
* @return the config used to create the FS.
protected Configuration getConfiguration() {
return getContract().getConf();
* Store operations to interact with..
* @return store operations.
protected ManifestStoreOperations getStoreOperations() {
return storeOperations;
* Set store operations.
* @param storeOperations new value
protected void setStoreOperations(final ManifestStoreOperations storeOperations) {
this.storeOperations = storeOperations;
public List<String> getTaskAttemptIds() {
return taskAttemptIds;
public List<String> getTaskIds() {
return taskIds;
public long getTotalDataSize() {
return totalDataSize.get();
public Path getManifestDir() {
return manifestDir;
* Set builder value.
* @param value new value
* @return the builder
public AbstractManifestCommitterTest withManifestDir(Path value) {
manifestDir = value;
return this;
* Describe a test in the logs.
* @param text text to print
* @param args arguments to format in the printing
protected void describe(String text, Object... args) {"\n\n{}: {}\n",
String.format(text, args));
* Local FS unless overridden.
* @param conf configuration
* @return the FS contract.
protected AbstractFSContract createContract(final Configuration conf) {
return new LocalFSContract(conf);
/** Enable the manifest committer options in the configuration. */
protected Configuration createConfiguration() {
return enableManifestCommitter(super.createConfiguration());
public void setup() throws Exception {
// set the manifest committer to a localfs path for reports across
// all threads.
// do this before superclass setup so reportDir is non-null there
// and can be used in creating the configuration.
reportDir = new File(getProjectBuildDir(), "reports");
// superclass setup includes creating a filesystem instance
// for the target store.
manifestDir = path("manifests");
// destination directory defaults to method path in
// target FS
// stage statistics
// thread pool for task submission.
setSubmitter(createCloseableTaskSubmitter(POOL_SIZE, TASK_IDS.getJobId()));
// store operations for the target filesystem.
storeOperations = createManifestStoreOperations();
* Overrride point: create the store operations.
* @return store operations for this suite.
protected ManifestStoreOperations createManifestStoreOperations() throws IOException {
final FileSystem fs = getFileSystem();
return ManifestCommitterSupport.createManifestStoreOperations(fs.getConf(), fs, getTestPath());
public void teardown() throws Exception {
IOUtils.cleanupWithLogger(LOG, storeOperations, getSubmitter());
storeOperations = null;
* Add a long delay so that you don't get timeouts when working
* with object stores or debugging.
* @return a longer timeout than the base classes.
protected int getTestTimeoutMillis() {
return 600_000;
protected Path getTestPath() {
return getContract().getTestPath();
* Get the task submitter.
* @return submitter or null
protected CloseableTaskPoolSubmitter getSubmitter() {
return submitter;
* Set the task submitter.
* @param submitter new value.
protected void setSubmitter(CloseableTaskPoolSubmitter submitter) {
this.submitter = submitter;
* Get the executor which the submitter also uses.
* @return an executor.
protected ExecutorService getExecutorService() {
return getSubmitter().getPool();
* @return IOStatistics for stage.
protected final IOStatisticsStore getStageStatistics() {
return stageStatistics;
* Set the statistics.
* @param stageStatistics statistics.
protected final void setStageStatistics(IOStatisticsStore stageStatistics) {
this.stageStatistics = stageStatistics;
* Get the progress counter invoked during commit operations.
* @return progress counter.
protected final ProgressCounter getProgressCounter() {
return progressCounter;
* Get the report directory.
* @return report directory.
public final File getReportDir() {
return reportDir;
* Get the report directory as a URI.
* @return report directory.
public final URI getReportDirUri() {
return getReportDir().toURI();
* Get the (shared) thread leak tracker.
* @return the thread leak tracker.
protected static ThreadLeakTracker getThreadLeakTracker() {
* Make sure there's no thread leakage.
public static void threadLeakage() {
* Dump the filesystem statistics after the class.
public static void dumpFileSystemIOStatistics() {"Aggregate FileSystem Statistics {}",
* Create a directory tree through an executor.
* dirs created = width^depth;
* file count = width^depth * files
* If createDirs == true, then directories are created at the bottom,
* not files.
* @param base base dir
* @param prefix prefix for filenames.
* @param executor submitter.
* @param depth depth of dirs
* @param width width of dirs
* @param files files to add in each base dir.
* @param createDirs create directories rather than files?
* @return the list of paths
* @throws IOException failure.
public final List<Path> createFilesOrDirs(Path base,
String prefix,
ExecutorService executor,
int depth,
int width,
int files,
boolean createDirs) throws IOException {
try (DurationInfo ignored = new DurationInfo(LOG, true,
"Creating Files %s (%d, %d, %d) under %s",
prefix, depth, width, files, base)) {
assertPathExists("Task attempt dir", base);
// create the files in the thread pool.
List<Future<Path>> futures = createFilesOrDirs(
new ArrayList<>(),
base, prefix,
depth, width, files,
List<Path> result = new ArrayList<>();
// now wait for the creations to finish.
for (Future<Path> f : futures) {
return result;
* Counter incremented for each file created.
private final AtomicLong fileDataGenerator = new AtomicLong();
* Create files or directories; done in a treewalk and building up
* a list of futures to wait for. The list is
* build up incrementally rather than through some merging of
* lists created down the tree.
* If createDirs == true, then directories are created at the bottom,
* not files.
* @param futures list of futures to build up.
* @param base base dir
* @param prefix prefix for filenames.
* @param executor submitter.
* @param depth depth of dirs
* @param width width of dirs
* @param files files to add in each base dir.
* @param createDirs create directories rather than files?
* @return the list of futures
private List<Future<Path>> createFilesOrDirs(
List<Future<Path>> futures,
Path base,
String prefix,
ExecutorService executor,
int depth,
int width,
int files,
boolean createDirs) {
if (depth > 0) {
// still creating directories
for (int i = 0; i < width; i++) {
Path child = new Path(base,
String.format("dir-%02d-%02d", depth, i));
createFilesOrDirs(futures, child, prefix, executor, depth - 1, width, files, false);
} else {
// time to create files
for (int i = 0; i < files; i++) {
Path file = new Path(base,
String.format("%s-%04d", prefix,
// buld the data. Not actually used in mkdir.
long entry = fileDataGenerator.incrementAndGet() & 0xffff;
byte[] data = new byte[2];
data[0] = (byte) (entry & 0xff);
data[1] = (byte) ((entry & 0xff00) >> 8);
// the async operation.
Future<Path> f = executor.submit(() -> {
if (!createDirs) {
// create files
ContractTestUtils.createFile(getFileSystem(), file, true, data);
} else {
// create directories
return file;
return futures;
* Create a list of paths under a dir.
* @param base base dir
* @param count count
* @return the list
protected List<Path> subpaths(Path base, int count) {
return IntStream.rangeClosed(1, count)
.mapToObj(i -> new Path(base, String.format("entry-%02d", i)))
* Submit a mkdir call to the executor pool.
* @param path path of dir to create.
* @return future
protected CompletableFuture<Path> asyncMkdir(final Path path) {
CompletableFuture<Path> f = new CompletableFuture<>();
getExecutorService().submit(() -> {
try {
} catch (IOException e) {
return f;
* Given a list of paths, create the dirs async.
* @param paths path list
* @throws IOException failure
protected void asyncMkdirs(Collection<Path> paths) throws IOException {
List<CompletableFuture<Path>> futures = new ArrayList<>();
// initiate
for (Path path: paths) {
// await
for (Future<Path> f : futures) {
* Submit an oepration to create a file to the executor pool.
* @param path path of file to create.
* @return future
protected CompletableFuture<Path> asyncPut(final Path path, byte[] data) {
CompletableFuture<Path> f = new CompletableFuture<>();
getExecutorService().submit(() -> {
try {
ContractTestUtils.createFile(getFileSystem(), path, true, data);
} catch (IOException e) {
return f;
* Convert the manifest list to a map by task attempt ID.
* @param list manifests
* @return a map, indexed by task attempt ID.
protected Map<String, TaskManifest> toMap(List<TaskManifest> list) {
.collect(Collectors.toMap(TaskManifest::getTaskAttemptID, x -> x));
* Verify the manifest files match the list of paths.
* @param manifest manifest to audit
* @param files list of files.
protected void verifyManifestFilesMatch(final TaskManifest manifest,
final List<Path> files) {
// get the list of source paths
Set<Path> filesToRename = manifest.getFilesToCommit()
// which must match that of all the files created
* Verify that a task manifest has a given attempt ID.
* @param manifest manifest, may be null.
* @param attemptId expected attempt ID
* @return the manifest, guaranteed to be non-null and of task attempt.
protected TaskManifest verifyManifestTaskAttemptID(
final TaskManifest manifest,
final String attemptId) {
.describedAs("Manifest of task %s", attemptId)
.describedAs("Task Attempt ID of manifest %s", manifest)
return manifest;
* Assert that a path must exist; return the path.
* @param message text for error message.
* @param path path to validate.
* @return the path
* @throws IOException IO Failure
Path pathMustExist(final String message,
final Path path) throws IOException {
assertPathExists(message, path);
return path;
* Assert that a path must exist; return the path.
* It must also equal the expected value.
* @param message text for error message.
* @param expectedPath expected path.
* @param actualPath path to validate.
* @return the path
* @throws IOException IO Failure
Path verifyPath(final String message,
final Path expectedPath,
final Path actualPath) throws IOException {
return pathMustExist(message, actualPath);
* Verify that the specified dir has the {@code _SUCCESS} marker
* and that it can be loaded.
* The contents will be logged and returned.
* @param dir directory to scan
* @param jobId job ID, only verified if non-empty
* @return the loaded success data
* @throws IOException IO Failure
protected ManifestSuccessData verifySuccessMarker(Path dir, String jobId)
throws IOException {
return validateSuccessFile(getFileSystem(), dir, 0, jobId);
* Read a UTF-8 file.
* @param path path to read
* @return string value
* @throws IOException IO failure
protected String readFile(Path path) throws IOException {
return ContractTestUtils.readUTF8(getFileSystem(), path, -1);
* Modify a (job) config to switch to the manifest committer;
* output validation is also enabled.
* @param conf config to patch.
* @return the updated configuration.
protected Configuration enableManifestCommitter(final Configuration conf) {
// always create a job marker
// and validate the output, for extra rigorousness
conf.setBoolean(OPT_VALIDATE_OUTPUT, true);
// set the manifest rename dir if non-null
if (getManifestDir() != null) {
// and bind the report dir
conf.set(OPT_SUMMARY_REPORT_DIR, getReportDirUri().toString());
return conf;
* Create the stage config for a job but don't finalize it.
* Uses {@link #TASK_IDS} for job/task ID.
* @param jobAttemptNumber job attempt number
* @param outputPath path where the final output goes
* @return the config
protected StageConfig createStageConfigForJob(
final int jobAttemptNumber,
final Path outputPath) {
return createStageConfig(jobAttemptNumber, -1, 0, outputPath);
* Create the stage config for job or task but don't finalize it.
* Uses {@link #TASK_IDS} for job/task ID.
* @param jobAttemptNumber job attempt number
* @param taskIndex task attempt index; -1 for job attempt only.
* @param taskAttemptNumber task attempt number
* @param outputPath path where the final output goes
* @return the config
protected StageConfig createStageConfig(
final int jobAttemptNumber,
final int taskIndex,
final int taskAttemptNumber,
final Path outputPath) {
final String jobId = TASK_IDS.getJobId();
ManifestCommitterSupport.AttemptDirectories attemptDirs =
new ManifestCommitterSupport.AttemptDirectories(outputPath,
jobId, jobAttemptNumber);
StageConfig config = new StageConfig();
.withName(String.format(NAME_FORMAT_JOB_ATTEMPT, jobId))
// if there's a task attempt ID set, set up its details
if (taskIndex >= 0) {
String taskAttempt = TASK_IDS.getTaskAttempt(taskIndex,
return config;
* A job stage config.
* @return stage config or null.
protected StageConfig getJobStageConfig() {
return jobStageConfig;
protected void setJobStageConfig(StageConfig jobStageConfig) {
this.jobStageConfig = jobStageConfig;
protected Path getDestDir() {
return destDir;
protected void setDestDir(Path destDir) {
this.destDir = destDir;
* Execute a set of tasks; task ID is a simple count.
* task attempt is lowest 2 bits of task ID.
* @param taskAttemptCount number of tasks.
* @param filesPerTaskAttempt number of files to include in manifest.
* @return the manifests.
* @throws IOException IO failure.
protected List<TaskManifest> executeTaskAttempts(int taskAttemptCount,
int filesPerTaskAttempt) throws IOException {
try (DurationInfo di = new DurationInfo(LOG, true, "create manifests")) {
// build a list of the task IDs.
// it's really hard to create a list of Integers; the java8
// IntStream etc doesn't quite fit as they do their best
// keep things unboxed, trying to map(Integer::valueOf) doesn't help.
List<Integer> taskIdList = new ArrayList<>(taskAttemptCount);
for (int t = 0; t < taskAttemptCount; t++) {
/// execute the tasks
List<TaskManifest> manifests = Collections.synchronizedList(
new ArrayList<>());
// then submit their creation/save to the pool.
.run(i -> {
executeOneTaskAttempt(i, i & 0x03, filesPerTaskAttempt));
return manifests;
* Create at task ID and attempt (adding to taskIDs and taskAttemptIds)
* setup the task, create a manifest with fake task entries
* and save that manifest to the job attempt dir.
* No actual files are created.
* @param task task index
* @param taskAttempt task attempt value
* @param filesPerTaskAttempt number of files to include in manifest.
* @return the manifest
* @throws IOException failure
protected TaskManifest executeOneTaskAttempt(final int task,
int taskAttempt, final int filesPerTaskAttempt) throws IOException {
String tid = String.format("task_%03d", task);
String taskAttemptId = String.format("%s_%02d",
tid, taskAttempt);
synchronized (this) {
// for each task, a job config is created then patched with the task info
StageConfig taskStageConfig = createTaskStageConfig(JOB1, tid, taskAttemptId);"Generating manifest for {}", taskAttemptId);
// task setup: create dest dir.
// This helps generate a realistic
// workload for the parallelized job cleanup.
new SetupTaskStage(taskStageConfig).apply("task " + taskAttemptId);
final TaskManifest manifest = createTaskManifest(taskStageConfig);
Path taDir = taskStageConfig.getTaskAttemptDir();
long size = task * 1000_0000L;
// for each task, 10 dirs, one file per dir.
for (int i = 0; i < filesPerTaskAttempt; i++) {
Path in = new Path(taDir, "dir-" + i);
Path out = new Path(getDestDir(), "dir-" + i);
manifest.addDirectory(DirEntry.dirEntry(out, 0, 1));
String name = taskStageConfig.getTaskAttemptId() + ".csv";
Path src = new Path(in, name);
Path dest = new Path(out, name);
long fileSize = size + i * 1000L;
new FileEntry(src, dest, fileSize, Long.toString(fileSize, 16)));
// save the manifest for this stage.
new SaveTaskManifestStage(taskStageConfig).apply(manifest);
return manifest;
public StageConfig createTaskStageConfig(final int jobId, final String tid,
final String taskAttemptId) {
Path jobAttemptTaskSubDir = getJobStageConfig().getJobAttemptTaskSubDir();
StageConfig taskStageConfig = createStageConfigForJob(jobId, getDestDir())
.withTaskAttemptDir(new Path(jobAttemptTaskSubDir, taskAttemptId));
return taskStageConfig;
* Verify that the job directories have been cleaned up.
* @throws IOException IO failure
protected void verifyJobDirsCleanedUp() throws IOException {
StageConfig stageConfig = getJobStageConfig();
assertPathDoesNotExist("Job attempt dir", stageConfig.getJobAttemptDir());
assertPathDoesNotExist("dest temp dir", stageConfig.getOutputTempSubDir());
* List a directory/directory tree and print files.
* @param fileSystem FS
* @param path path
* @param recursive do a recursive listing?
* @return the number of files found.
* @throws IOException failure.
public static long lsR(FileSystem fileSystem, Path path, boolean recursive)
throws Exception {
if (path == null) {
// surfaces when someone calls getParent() on something at the top
// of the path"Empty path");
return 0;
} else {"Listing of {}", path);
final long count = RemoteIterators.foreach(
fileSystem.listFiles(path, recursive),
(status) ->"{}", status));"Count of entries: {}", count);
return count;
* Assert that a cleanup stage coursehad a given outcome and
* deleted the given number of directories.
* @param result result to analyze
* @param outcome expected outcome
* @param expectedDirsDeleted #of directories deleted. -1 for no checks
protected void assertCleanupResult(
CleanupJobStage.Result result,
CleanupJobStage.Outcome outcome,
int expectedDirsDeleted) {
.describedAs("Outcome of cleanup() in %s", result)
if (expectedDirsDeleted >= 0) {
.describedAs("Number of directories deleted in cleanup %s", result)
* Create and execute a cleanup stage.
* @param enabled is the stage enabled?
* @param deleteTaskAttemptDirsInParallel delete task attempt dirs in
* parallel?
* @param suppressExceptions suppress exceptions?
* @param outcome expected outcome.
* @param expectedDirsDeleted #of directories deleted. -1 for no checks
* @return the result
* @throws IOException non-suppressed exception
protected CleanupJobStage.Result cleanup(
final boolean enabled,
final boolean deleteTaskAttemptDirsInParallel,
final boolean suppressExceptions,
final CleanupJobStage.Outcome outcome,
final int expectedDirsDeleted) throws IOException {
StageConfig stageConfig = getJobStageConfig();
CleanupJobStage.Result result = new CleanupJobStage(stageConfig)
.apply(new CleanupJobStage.Arguments(OP_STAGE_JOB_CLEANUP,
enabled, deleteTaskAttemptDirsInParallel, suppressExceptions));
assertCleanupResult(result, outcome, expectedDirsDeleted);
return result;
* Read the UTF_8 text in the file.
* @param path path to read
* @return the string
* @throws IOException failure
protected String readText(final Path path) throws IOException {
final FileSystem fs = getFileSystem();
final FileStatus st = fs.getFileStatus(path);
.describedAs("length of file %s", st)
return new String(
readDataset(fs, path, (int) st.getLen()),
* Counter.
protected static final class ProgressCounter implements Progressable {
private final AtomicLong counter = new AtomicLong();
* Increment the counter.
public void progress() {
* Get the counter value.
* @return the current value.
public long value() {
return counter.get();
* Reset the counter.
public void reset() {
public String toString() {
final StringBuilder sb = new StringBuilder(
return sb.toString();
* Get the progress counter of a stage.
* @param stageConfig stage
* @return its progress counter.
ProgressCounter progressOf(StageConfig stageConfig) {
return (ProgressCounter) stageConfig.getProgressable();