blob: bcd9d34e263dea65ad8409e0975088cc7ea14d9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SaveTaskManifestStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConfig.createCloseableTaskSubmitter;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_ID_SOURCE_MAPREDUCE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_FACTORY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_VALIDATE_OUTPUT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.getProjectBuildDir;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.NAME_FORMAT_JOB_ATTEMPT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createIOStatisticsStore;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createTaskManifest;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
* Tests which work with manifest committers.
* This is a filesystem contract bound to the local filesystem;
* subclasses may change the FS to test against other stores.
* Some fields are set up in
* in {@link #executeOneTaskAttempt(int, int, int)},
* which is why fields are used.
* when synchronized access is needed; synchronize on (this) rather
* than individual fields
*/
public abstract class AbstractManifestCommitterTest
extends AbstractFSContractTestBase {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractManifestCommitterTest.class);
/**
* Some Job and task IDs.
*/
protected static final ManifestCommitterTestSupport.JobAndTaskIDsForTests
TASK_IDS = new ManifestCommitterTestSupport.JobAndTaskIDsForTests(2, 2);
public static final int JOB1 = 1;
public static final int TASK0 = 0;
public static final int TASK1 = 1;
/**
* Task attempt 0 index.
*/
public static final int TA0 = 0;
/**
* Task attempt 1 index.
*/
public static final int TA1 = 1;
/**
* Depth of dir tree to generate.
*/
public static final int DEPTH = 3;
/**
* Width of dir tree at every level.
*/
public static final int WIDTH = 2;
/**
* How many files to create in the leaf directories.
*/
public static final int FILES_PER_DIRECTORY = 4;
/**
* Pool size.
*/
public static final int POOL_SIZE = 32;
/**
* FileSystem statistics are collected across every test case.
*/
protected static final IOStatisticsSnapshot FILESYSTEM_IOSTATS =
snapshotIOStatistics();
/**
* Counter for creating files. Ensures that across all test suites,
* duplicate filenames are never created. Helps assign blame.
*/
private static final AtomicLong CREATE_FILE_COUNTER = new AtomicLong();
protected static final byte[] NO_DATA = new byte[0];
/**
* The thread leak tracker.
*/
private static final ThreadLeakTracker THREAD_LEAK_TRACKER = new ThreadLeakTracker();
private static final int MAX_LEN = 64_000;
/**
* Submitter for tasks; may be null.
*/
private CloseableTaskPoolSubmitter submitter;
/**
* Stage statistics. Created in test setup, and in
* teardown updates {@link #FILESYSTEM_IOSTATS}.
*/
private IOStatisticsStore stageStatistics;
/**
* Prefer to use these to interact with the FS to
* ensure more implicit coverage.
*/
private ManifestStoreOperations storeOperations;
/**
* Progress counter used in all stage configs.
*/
private final ProgressCounter progressCounter = new ProgressCounter();
/**
* Directory for job summary reports.
* This should be set up in test suites testing against real object stores.
*/
private File reportDir;
/**
* List of task attempt IDs for those tests which create manifests.
*/
private final List<String> taskAttemptIds = new ArrayList<>();
/**
* List of task IDs for those tests which create manifests.
*/
private final List<String> taskIds = new ArrayList<>();
/**
* any job stage configuration created for operations.
*/
private StageConfig jobStageConfig;
/**
* Destination dir of job.
*/
private Path destDir;
/**
* When creating manifests, total data size.
*/
private final AtomicLong totalDataSize = new AtomicLong();
/**
* Where to move manifests; must be in target FS.
*/
private Path manifestDir;
/**
* Get the contract configuration.
* @return the config used to create the FS.
*/
protected Configuration getConfiguration() {
return getContract().getConf();
}
/**
* Store operations to interact with..
* @return store operations.
*/
protected ManifestStoreOperations getStoreOperations() {
return storeOperations;
}
/**
* Set store operations.
* @param storeOperations new value
*/
protected void setStoreOperations(final ManifestStoreOperations storeOperations) {
this.storeOperations = storeOperations;
}
public List<String> getTaskAttemptIds() {
return taskAttemptIds;
}
public List<String> getTaskIds() {
return taskIds;
}
public long getTotalDataSize() {
return totalDataSize.get();
}
public Path getManifestDir() {
return manifestDir;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public AbstractManifestCommitterTest withManifestDir(Path value) {
manifestDir = value;
return this;
}
/**
* Describe a test in the logs.
* @param text text to print
* @param args arguments to format in the printing
*/
protected void describe(String text, Object... args) {
LOG.info("\n\n{}: {}\n",
getMethodName(),
String.format(text, args));
}
/**
* Local FS unless overridden.
* @param conf configuration
* @return the FS contract.
*/
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new LocalFSContract(conf);
}
/** Enable the manifest committer options in the configuration. */
@Override
protected Configuration createConfiguration() {
return enableManifestCommitter(super.createConfiguration());
}
@Override
public void setup() throws Exception {
// set the manifest committer to a localfs path for reports across
// all threads.
// do this before superclass setup so reportDir is non-null there
// and can be used in creating the configuration.
reportDir = new File(getProjectBuildDir(), "reports");
reportDir.mkdirs();
// superclass setup includes creating a filesystem instance
// for the target store.
super.setup();
manifestDir = path("manifests");
// destination directory defaults to method path in
// target FS
setDestDir(methodPath());
// stage statistics
setStageStatistics(createIOStatisticsStore().build());
// thread pool for task submission.
setSubmitter(createCloseableTaskSubmitter(POOL_SIZE, TASK_IDS.getJobId()));
// store operations for the target filesystem.
storeOperations = createManifestStoreOperations();
}
/**
* Overrride point: create the store operations.
* @return store operations for this suite.
*/
protected ManifestStoreOperations createManifestStoreOperations() throws IOException {
final FileSystem fs = getFileSystem();
return ManifestCommitterSupport.createManifestStoreOperations(fs.getConf(), fs, getTestPath());
}
@Override
public void teardown() throws Exception {
Thread.currentThread().setName("teardown");
IOUtils.cleanupWithLogger(LOG, storeOperations, getSubmitter());
storeOperations = null;
super.teardown();
FILESYSTEM_IOSTATS.aggregate(retrieveIOStatistics(getFileSystem()));
FILESYSTEM_IOSTATS.aggregate(getStageStatistics());
}
/**
* Add a long delay so that you don't get timeouts when working
* with object stores or debugging.
* @return a longer timeout than the base classes.
*/
@Override
protected int getTestTimeoutMillis() {
return 600_000;
}
protected Path getTestPath() {
return getContract().getTestPath();
}
/**
* Get the task submitter.
* @return submitter or null
*/
protected CloseableTaskPoolSubmitter getSubmitter() {
return submitter;
}
/**
* Set the task submitter.
* @param submitter new value.
*/
protected void setSubmitter(CloseableTaskPoolSubmitter submitter) {
this.submitter = submitter;
}
/**
* Get the executor which the submitter also uses.
* @return an executor.
*/
protected ExecutorService getExecutorService() {
return getSubmitter().getPool();
}
/**
* @return IOStatistics for stage.
*/
protected final IOStatisticsStore getStageStatistics() {
return stageStatistics;
}
/**
* Set the statistics.
* @param stageStatistics statistics.
*/
protected final void setStageStatistics(IOStatisticsStore stageStatistics) {
this.stageStatistics = stageStatistics;
}
/**
* Get the progress counter invoked during commit operations.
* @return progress counter.
*/
protected final ProgressCounter getProgressCounter() {
return progressCounter;
}
/**
* Get the report directory.
* @return report directory.
*/
public final File getReportDir() {
return reportDir;
}
/**
* Get the report directory as a URI.
* @return report directory.
*/
public final URI getReportDirUri() {
return getReportDir().toURI();
}
/**
* Get the (shared) thread leak tracker.
* @return the thread leak tracker.
*/
protected static ThreadLeakTracker getThreadLeakTracker() {
return THREAD_LEAK_TRACKER;
}
/**
* Make sure there's no thread leakage.
*/
@AfterClass
public static void threadLeakage() {
THREAD_LEAK_TRACKER.assertNoThreadLeakage();
}
/**
* Dump the filesystem statistics after the class.
*/
@AfterClass
public static void dumpFileSystemIOStatistics() {
LOG.info("Aggregate FileSystem Statistics {}",
ioStatisticsToPrettyString(FILESYSTEM_IOSTATS));
}
/**
* Create a directory tree through an executor.
* dirs created = width^depth;
* file count = width^depth * files
* If createDirs == true, then directories are created at the bottom,
* not files.
* @param base base dir
* @param prefix prefix for filenames.
* @param executor submitter.
* @param depth depth of dirs
* @param width width of dirs
* @param files files to add in each base dir.
* @param createDirs create directories rather than files?
* @return the list of paths
* @throws IOException failure.
*/
public final List<Path> createFilesOrDirs(Path base,
String prefix,
ExecutorService executor,
int depth,
int width,
int files,
boolean createDirs) throws IOException {
try (DurationInfo ignored = new DurationInfo(LOG, true,
"Creating Files %s (%d, %d, %d) under %s",
prefix, depth, width, files, base)) {
assertPathExists("Task attempt dir", base);
// create the files in the thread pool.
List<Future<Path>> futures = createFilesOrDirs(
new ArrayList<>(),
base, prefix,
executor,
depth, width, files,
createDirs);
List<Path> result = new ArrayList<>();
// now wait for the creations to finish.
for (Future<Path> f : futures) {
result.add(awaitFuture(f));
}
return result;
}
}
/**
* Counter incremented for each file created.
*/
private final AtomicLong fileDataGenerator = new AtomicLong();
/**
* Create files or directories; done in a treewalk and building up
* a list of futures to wait for. The list is
* build up incrementally rather than through some merging of
* lists created down the tree.
* If createDirs == true, then directories are created at the bottom,
* not files.
*
* @param futures list of futures to build up.
* @param base base dir
* @param prefix prefix for filenames.
* @param executor submitter.
* @param depth depth of dirs
* @param width width of dirs
* @param files files to add in each base dir.
* @param createDirs create directories rather than files?
* @return the list of futures
*/
private List<Future<Path>> createFilesOrDirs(
List<Future<Path>> futures,
Path base,
String prefix,
ExecutorService executor,
int depth,
int width,
int files,
boolean createDirs) {
if (depth > 0) {
// still creating directories
for (int i = 0; i < width; i++) {
Path child = new Path(base,
String.format("dir-%02d-%02d", depth, i));
createFilesOrDirs(futures, child, prefix, executor, depth - 1, width, files, false);
}
} else {
// time to create files
for (int i = 0; i < files; i++) {
Path file = new Path(base,
String.format("%s-%04d", prefix,
CREATE_FILE_COUNTER.incrementAndGet()));
// buld the data. Not actually used in mkdir.
long entry = fileDataGenerator.incrementAndGet() & 0xffff;
byte[] data = new byte[2];
data[0] = (byte) (entry & 0xff);
data[1] = (byte) ((entry & 0xff00) >> 8);
// the async operation.
Future<Path> f = executor.submit(() -> {
if (!createDirs) {
// create files
ContractTestUtils.createFile(getFileSystem(), file, true, data);
} else {
// create directories
mkdirs(file);
}
return file;
});
futures.add(f);
}
}
return futures;
}
/**
* Create a list of paths under a dir.
* @param base base dir
* @param count count
* @return the list
*/
protected List<Path> subpaths(Path base, int count) {
return IntStream.rangeClosed(1, count)
.mapToObj(i -> new Path(base, String.format("entry-%02d", i)))
.collect(Collectors.toList());
}
/**
* Submit a mkdir call to the executor pool.
* @param path path of dir to create.
* @return future
*/
protected CompletableFuture<Path> asyncMkdir(final Path path) {
CompletableFuture<Path> f = new CompletableFuture<>();
getExecutorService().submit(() -> {
try {
mkdirs(path);
f.complete(path);
} catch (IOException e) {
f.completeExceptionally(e);
}
});
return f;
}
/**
* Given a list of paths, create the dirs async.
* @param paths path list
* @throws IOException failure
*/
protected void asyncMkdirs(Collection<Path> paths) throws IOException {
List<CompletableFuture<Path>> futures = new ArrayList<>();
// initiate
for (Path path: paths) {
futures.add(asyncMkdir(path));
}
// await
for (Future<Path> f : futures) {
awaitFuture(f);
}
}
/**
* Submit an oepration to create a file to the executor pool.
* @param path path of file to create.
* @return future
*/
protected CompletableFuture<Path> asyncPut(final Path path, byte[] data) {
CompletableFuture<Path> f = new CompletableFuture<>();
getExecutorService().submit(() -> {
try {
ContractTestUtils.createFile(getFileSystem(), path, true, data);
f.complete(path);
} catch (IOException e) {
f.completeExceptionally(e);
}
});
return f;
}
/**
* Convert the manifest list to a map by task attempt ID.
* @param list manifests
* @return a map, indexed by task attempt ID.
*/
protected Map<String, TaskManifest> toMap(List<TaskManifest> list) {
return list.stream()
.collect(Collectors.toMap(TaskManifest::getTaskAttemptID, x -> x));
}
/**
* Verify the manifest files match the list of paths.
* @param manifest manifest to audit
* @param files list of files.
*/
protected void verifyManifestFilesMatch(final TaskManifest manifest,
final List<Path> files) {
// get the list of source paths
Set<Path> filesToRename = manifest.getFilesToCommit()
.stream()
.map(FileEntry::getSourcePath)
.collect(Collectors.toSet());
// which must match that of all the files created
Assertions.assertThat(filesToRename)
.containsExactlyInAnyOrderElementsOf(files);
}
/**
* Verify that a task manifest has a given attempt ID.
* @param manifest manifest, may be null.
* @param attemptId expected attempt ID
* @return the manifest, guaranteed to be non-null and of task attempt.
*/
protected TaskManifest verifyManifestTaskAttemptID(
final TaskManifest manifest,
final String attemptId) {
Assertions.assertThat(manifest)
.describedAs("Manifest of task %s", attemptId)
.isNotNull();
Assertions.assertThat(manifest.getTaskAttemptID())
.describedAs("Task Attempt ID of manifest %s", manifest)
.isEqualTo(attemptId);
return manifest;
}
/**
* Assert that a path must exist; return the path.
* @param message text for error message.
* @param path path to validate.
* @return the path
* @throws IOException IO Failure
*/
Path pathMustExist(final String message,
final Path path) throws IOException {
assertPathExists(message, path);
return path;
}
/**
* Assert that a path must exist; return the path.
* It must also equal the expected value.
* @param message text for error message.
* @param expectedPath expected path.
* @param actualPath path to validate.
* @return the path
* @throws IOException IO Failure
*/
Path verifyPath(final String message,
final Path expectedPath,
final Path actualPath) throws IOException {
Assertions.assertThat(actualPath)
.describedAs(message)
.isEqualTo(expectedPath);
return pathMustExist(message, actualPath);
}
/**
* Verify that the specified dir has the {@code _SUCCESS} marker
* and that it can be loaded.
* The contents will be logged and returned.
* @param dir directory to scan
* @param jobId job ID, only verified if non-empty
* @return the loaded success data
* @throws IOException IO Failure
*/
protected ManifestSuccessData verifySuccessMarker(Path dir, String jobId)
throws IOException {
return validateSuccessFile(getFileSystem(), dir, 0, jobId);
}
/**
* Read a UTF-8 file.
* @param path path to read
* @return string value
* @throws IOException IO failure
*/
protected String readFile(Path path) throws IOException {
return ContractTestUtils.readUTF8(getFileSystem(), path, -1);
}
/**
* Modify a (job) config to switch to the manifest committer;
* output validation is also enabled.
* @param conf config to patch.
* @return the updated configuration.
*/
protected Configuration enableManifestCommitter(final Configuration conf) {
conf.set(COMMITTER_FACTORY_CLASS, MANIFEST_COMMITTER_FACTORY);
// always create a job marker
conf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
// and validate the output, for extra rigorousness
conf.setBoolean(OPT_VALIDATE_OUTPUT, true);
// set the manifest rename dir if non-null
if (getManifestDir() != null) {
conf.set(OPT_DIAGNOSTICS_MANIFEST_DIR,
getManifestDir().toUri().toString());
}
// and bind the report dir
conf.set(OPT_SUMMARY_REPORT_DIR, getReportDirUri().toString());
return conf;
}
/**
* Create the stage config for a job but don't finalize it.
* Uses {@link #TASK_IDS} for job/task ID.
* @param jobAttemptNumber job attempt number
* @param outputPath path where the final output goes
* @return the config
*/
protected StageConfig createStageConfigForJob(
final int jobAttemptNumber,
final Path outputPath) {
return createStageConfig(jobAttemptNumber, -1, 0, outputPath);
}
/**
* Create the stage config for job or task but don't finalize it.
* Uses {@link #TASK_IDS} for job/task ID.
* @param jobAttemptNumber job attempt number
* @param taskIndex task attempt index; -1 for job attempt only.
* @param taskAttemptNumber task attempt number
* @param outputPath path where the final output goes
* @return the config
*/
protected StageConfig createStageConfig(
final int jobAttemptNumber,
final int taskIndex,
final int taskAttemptNumber,
final Path outputPath) {
final String jobId = TASK_IDS.getJobId();
ManifestCommitterSupport.AttemptDirectories attemptDirs =
new ManifestCommitterSupport.AttemptDirectories(outputPath,
jobId, jobAttemptNumber);
StageConfig config = new StageConfig();
config
.withIOProcessors(getSubmitter())
.withIOStatistics(getStageStatistics())
.withJobId(jobId)
.withJobIdSource(JOB_ID_SOURCE_MAPREDUCE)
.withJobAttemptNumber(jobAttemptNumber)
.withJobDirectories(attemptDirs)
.withName(String.format(NAME_FORMAT_JOB_ATTEMPT, jobId))
.withOperations(getStoreOperations())
.withProgressable(getProgressCounter());
// if there's a task attempt ID set, set up its details
if (taskIndex >= 0) {
String taskAttempt = TASK_IDS.getTaskAttempt(taskIndex,
taskAttemptNumber);
config
.withTaskAttemptId(taskAttempt)
.withTaskId(TASK_IDS.getTaskIdType(taskIndex).toString())
.withTaskAttemptDir(
attemptDirs.getTaskAttemptPath(taskAttempt));
}
return config;
}
/**
* A job stage config.
* @return stage config or null.
*/
protected StageConfig getJobStageConfig() {
return jobStageConfig;
}
protected void setJobStageConfig(StageConfig jobStageConfig) {
this.jobStageConfig = jobStageConfig;
}
protected Path getDestDir() {
return destDir;
}
protected void setDestDir(Path destDir) {
this.destDir = destDir;
}
/**
* Execute a set of tasks; task ID is a simple count.
* task attempt is lowest 2 bits of task ID.
* @param taskAttemptCount number of tasks.
* @param filesPerTaskAttempt number of files to include in manifest.
* @return the manifests.
* @throws IOException IO failure.
*/
protected List<TaskManifest> executeTaskAttempts(int taskAttemptCount,
int filesPerTaskAttempt) throws IOException {
try (DurationInfo di = new DurationInfo(LOG, true, "create manifests")) {
// build a list of the task IDs.
// it's really hard to create a list of Integers; the java8
// IntStream etc doesn't quite fit as they do their best
// keep things unboxed, trying to map(Integer::valueOf) doesn't help.
List<Integer> taskIdList = new ArrayList<>(taskAttemptCount);
for (int t = 0; t < taskAttemptCount; t++) {
taskIdList.add(t);
}
/// execute the tasks
List<TaskManifest> manifests = Collections.synchronizedList(
new ArrayList<>());
// then submit their creation/save to the pool.
TaskPool.foreach(taskIdList)
.executeWith(getSubmitter())
.stopOnFailure()
.run(i -> {
manifests.add(
executeOneTaskAttempt(i, i & 0x03, filesPerTaskAttempt));
});
return manifests;
}
}
/**
* Create at task ID and attempt (adding to taskIDs and taskAttemptIds)
* setup the task, create a manifest with fake task entries
* and save that manifest to the job attempt dir.
* No actual files are created.
* @param task task index
* @param taskAttempt task attempt value
* @param filesPerTaskAttempt number of files to include in manifest.
* @return the manifest
* @throws IOException failure
*/
protected TaskManifest executeOneTaskAttempt(final int task,
int taskAttempt, final int filesPerTaskAttempt) throws IOException {
String tid = String.format("task_%03d", task);
String taskAttemptId = String.format("%s_%02d",
tid, taskAttempt);
synchronized (this) {
taskIds.add(tid);
taskAttemptIds.add(taskAttemptId);
}
// for each task, a job config is created then patched with the task info
StageConfig taskStageConfig = createTaskStageConfig(JOB1, tid, taskAttemptId);
LOG.info("Generating manifest for {}", taskAttemptId);
// task setup: create dest dir.
// This helps generate a realistic
// workload for the parallelized job cleanup.
new SetupTaskStage(taskStageConfig).apply("task " + taskAttemptId);
final TaskManifest manifest = createTaskManifest(taskStageConfig);
Path taDir = taskStageConfig.getTaskAttemptDir();
long size = task * 1000_0000L;
// for each task, 10 dirs, one file per dir.
for (int i = 0; i < filesPerTaskAttempt; i++) {
Path in = new Path(taDir, "dir-" + i);
Path out = new Path(getDestDir(), "dir-" + i);
manifest.addDirectory(DirEntry.dirEntry(out, 0, 1));
String name = taskStageConfig.getTaskAttemptId() + ".csv";
Path src = new Path(in, name);
Path dest = new Path(out, name);
long fileSize = size + i * 1000L;
manifest.addFileToCommit(
new FileEntry(src, dest, fileSize, Long.toString(fileSize, 16)));
totalDataSize.addAndGet(fileSize);
}
// save the manifest for this stage.
new SaveTaskManifestStage(taskStageConfig).apply(manifest);
return manifest;
}
public StageConfig createTaskStageConfig(final int jobId, final String tid,
final String taskAttemptId) {
Path jobAttemptTaskSubDir = getJobStageConfig().getJobAttemptTaskSubDir();
StageConfig taskStageConfig = createStageConfigForJob(jobId, getDestDir())
.withTaskId(tid)
.withTaskAttemptId(taskAttemptId)
.withTaskAttemptDir(new Path(jobAttemptTaskSubDir, taskAttemptId));
return taskStageConfig;
}
/**
* Verify that the job directories have been cleaned up.
* @throws IOException IO failure
*/
protected void verifyJobDirsCleanedUp() throws IOException {
StageConfig stageConfig = getJobStageConfig();
assertPathDoesNotExist("Job attempt dir", stageConfig.getJobAttemptDir());
assertPathDoesNotExist("dest temp dir", stageConfig.getOutputTempSubDir());
}
/**
* List a directory/directory tree and print files.
* @param fileSystem FS
* @param path path
* @param recursive do a recursive listing?
* @return the number of files found.
* @throws IOException failure.
*/
public static long lsR(FileSystem fileSystem, Path path, boolean recursive)
throws Exception {
if (path == null) {
// surfaces when someone calls getParent() on something at the top
// of the path
LOG.info("Empty path");
return 0;
} else {
LOG.info("Listing of {}", path);
final long count = RemoteIterators.foreach(
fileSystem.listFiles(path, recursive),
(status) -> LOG.info("{}", status));
LOG.info("Count of entries: {}", count);
return count;
}
}
/**
* Assert that a cleanup stage coursehad a given outcome and
* deleted the given number of directories.
* @param result result to analyze
* @param outcome expected outcome
* @param expectedDirsDeleted #of directories deleted. -1 for no checks
*/
protected void assertCleanupResult(
CleanupJobStage.Result result,
CleanupJobStage.Outcome outcome,
int expectedDirsDeleted) {
Assertions.assertThat(result.getOutcome())
.describedAs("Outcome of cleanup() in %s", result)
.isEqualTo(outcome);
if (expectedDirsDeleted >= 0) {
Assertions.assertThat(result.getDeleteCalls())
.describedAs("Number of directories deleted in cleanup %s", result)
.isEqualTo(expectedDirsDeleted);
}
}
/**
* Create and execute a cleanup stage.
* @param enabled is the stage enabled?
* @param deleteTaskAttemptDirsInParallel delete task attempt dirs in
* parallel?
* @param suppressExceptions suppress exceptions?
* @param outcome expected outcome.
* @param expectedDirsDeleted #of directories deleted. -1 for no checks
* @return the result
* @throws IOException non-suppressed exception
*/
protected CleanupJobStage.Result cleanup(
final boolean enabled,
final boolean deleteTaskAttemptDirsInParallel,
final boolean suppressExceptions,
final CleanupJobStage.Outcome outcome,
final int expectedDirsDeleted) throws IOException {
StageConfig stageConfig = getJobStageConfig();
CleanupJobStage.Result result = new CleanupJobStage(stageConfig)
.apply(new CleanupJobStage.Arguments(OP_STAGE_JOB_CLEANUP,
enabled, deleteTaskAttemptDirsInParallel, suppressExceptions));
assertCleanupResult(result, outcome, expectedDirsDeleted);
return result;
}
/**
* Read the UTF_8 text in the file.
* @param path path to read
* @return the string
* @throws IOException failure
*/
protected String readText(final Path path) throws IOException {
final FileSystem fs = getFileSystem();
final FileStatus st = fs.getFileStatus(path);
Assertions.assertThat(st.getLen())
.describedAs("length of file %s", st)
.isLessThanOrEqualTo(MAX_LEN);
return new String(
readDataset(fs, path, (int) st.getLen()),
StandardCharsets.UTF_8);
}
/**
* Counter.
*/
protected static final class ProgressCounter implements Progressable {
private final AtomicLong counter = new AtomicLong();
/**
* Increment the counter.
*/
@Override
public void progress() {
counter.incrementAndGet();
}
/**
* Get the counter value.
* @return the current value.
*/
public long value() {
return counter.get();
}
/**
* Reset the counter.
*/
public void reset() {
counter.set(0);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"ProgressCounter{");
sb.append("counter=").append(counter.get());
sb.append('}');
return sb.toString();
}
}
/**
* Get the progress counter of a stage.
* @param stageConfig stage
* @return its progress counter.
*/
ProgressCounter progressOf(StageConfig stageConfig) {
return (ProgressCounter) stageConfig.getProgressable();
}
}