/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.output.committer.manifest;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SaveTaskManifestStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.TaskPool;

import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConfig.createCloseableTaskSubmitter;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_ID_SOURCE_MAPREDUCE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_FACTORY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_VALIDATE_OUTPUT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.getProjectBuildDir;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.NAME_FORMAT_JOB_ATTEMPT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createIOStatisticsStore;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createTaskManifest;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/**
 * Tests which work with manifest committers.
 * This is a filesystem contract bound to the local filesystem;
 * subclasses may change the FS to test against other stores.
 * Some fields are set up in
 * in {@link #executeOneTaskAttempt(int, int, int)},
 * which is why fields are used.
 * when synchronized access is needed; synchronize on (this) rather
 * than individual fields
 */
public abstract class AbstractManifestCommitterTest
    extends AbstractFSContractTestBase {

  protected static final Logger LOG =
      LoggerFactory.getLogger(AbstractManifestCommitterTest.class);

  /**
   * Some Job and task IDs.
   */
  protected static final ManifestCommitterTestSupport.JobAndTaskIDsForTests
      TASK_IDS = new ManifestCommitterTestSupport.JobAndTaskIDsForTests(2, 2);

  public static final int JOB1 = 1;

  public static final int TASK0 = 0;

  public static final int TASK1 = 1;

  /**
   * Task attempt 0 index.
   */
  public static final int TA0 = 0;

  /**
   * Task attempt 1 index.
   */
  public static final int TA1 = 1;

  /**
   * Depth of dir tree to generate.
   */
  public static final int DEPTH = 3;

  /**
   * Width of dir tree at every level.
   */
  public static final int WIDTH = 2;

  /**
   * How many files to create in the leaf directories.
   */
  public static final int FILES_PER_DIRECTORY = 4;

  /**
   * Pool size.
   */
  public static final int POOL_SIZE = 32;

  /**
   * FileSystem statistics are collected across every test case.
   */
  protected static final IOStatisticsSnapshot FILESYSTEM_IOSTATS =
      snapshotIOStatistics();

  /**
   * Counter for creating files. Ensures that across all test suites,
   * duplicate filenames are never created. Helps assign blame.
   */
  private static final AtomicLong CREATE_FILE_COUNTER = new AtomicLong();

  protected static final byte[] NO_DATA = new byte[0];

  /**
   * The thread leak tracker.
   */
  private static final ThreadLeakTracker THREAD_LEAK_TRACKER = new ThreadLeakTracker();

  private static final int MAX_LEN = 64_000;

  /**
   * Submitter for tasks; may be null.
   */
  private CloseableTaskPoolSubmitter submitter;

  /**
   * Stage statistics. Created in test setup, and in
   * teardown updates {@link #FILESYSTEM_IOSTATS}.
   */
  private IOStatisticsStore stageStatistics;

  /**
   * Prefer to use these to interact with the FS to
   * ensure more implicit coverage.
   */
  private ManifestStoreOperations storeOperations;

  /**
   * Progress counter used in all stage configs.
   */
  private final ProgressCounter progressCounter = new ProgressCounter();

  /**
   * Directory for job summary reports.
   * This should be set up in test suites testing against real object stores.
   */
  private File reportDir;

  /**
   * List of task attempt IDs for those tests which create manifests.
   */
  private final List<String> taskAttemptIds = new ArrayList<>();

  /**
   * List of task IDs for those tests which create manifests.
   */
  private final List<String> taskIds = new ArrayList<>();

  /**
   * any job stage configuration created for operations.
   */
  private StageConfig jobStageConfig;

  /**
   * Destination dir of job.
   */
  private Path destDir;

  /**
   * When creating manifests, total data size.
   */
  private final AtomicLong totalDataSize = new AtomicLong();

  /**
   * Where to move manifests; must be in target FS.
   */
  private Path manifestDir;

  /**
   * Get the contract configuration.
   * @return the config used to create the FS.
   */
  protected Configuration getConfiguration() {
    return getContract().getConf();
  }

  /**
   * Store operations to interact with..
   * @return store operations.
   */
  protected ManifestStoreOperations getStoreOperations() {
    return storeOperations;
  }

  /**
   * Set store operations.
   * @param storeOperations new value
   */
  protected void setStoreOperations(final ManifestStoreOperations storeOperations) {
    this.storeOperations = storeOperations;
  }

  public List<String> getTaskAttemptIds() {
    return taskAttemptIds;
  }

  public List<String> getTaskIds() {
    return taskIds;
  }

  public long getTotalDataSize() {
    return totalDataSize.get();
  }

  public Path getManifestDir() {
    return manifestDir;
  }

  /**
   * Set builder value.
   * @param value new value
   * @return the builder
   */
  public AbstractManifestCommitterTest withManifestDir(Path value) {
    manifestDir = value;
    return this;
  }

  /**
   * Describe a test in the logs.
   * @param text text to print
   * @param args arguments to format in the printing
   */
  protected void describe(String text, Object... args) {
    LOG.info("\n\n{}: {}\n",
        getMethodName(),
        String.format(text, args));
  }

  /**
   * Local FS unless overridden.
   * @param conf configuration
   * @return the FS contract.
   */
  @Override
  protected AbstractFSContract createContract(final Configuration conf) {
    return new LocalFSContract(conf);
  }

  /** Enable the manifest committer options in the configuration. */
  @Override
  protected Configuration createConfiguration() {
    return enableManifestCommitter(super.createConfiguration());
  }

  @Override
  public void setup() throws Exception {

    // set the manifest committer to a localfs path for reports across
    // all threads.
    // do this before superclass setup so reportDir is non-null there
    // and can be used in creating the configuration.
    reportDir = new File(getProjectBuildDir(), "reports");
    reportDir.mkdirs();

    // superclass setup includes creating a filesystem instance
    // for the target store.
    super.setup();

    manifestDir = path("manifests");

    // destination directory defaults to method path in
    // target FS
    setDestDir(methodPath());

    // stage statistics
    setStageStatistics(createIOStatisticsStore().build());
    // thread pool for task submission.
    setSubmitter(createCloseableTaskSubmitter(POOL_SIZE, TASK_IDS.getJobId()));
    // store operations for the target filesystem.
    storeOperations = createManifestStoreOperations();
  }

  /**
   * Overrride point: create the store operations.
   * @return store operations for this suite.
   */
  protected ManifestStoreOperations createManifestStoreOperations() throws IOException {
    final FileSystem fs = getFileSystem();
    return ManifestCommitterSupport.createManifestStoreOperations(fs.getConf(), fs, getTestPath());
  }

  @Override
  public void teardown() throws Exception {
    Thread.currentThread().setName("teardown");

    IOUtils.cleanupWithLogger(LOG, storeOperations, getSubmitter());
    storeOperations = null;
    super.teardown();
    FILESYSTEM_IOSTATS.aggregate(retrieveIOStatistics(getFileSystem()));
    FILESYSTEM_IOSTATS.aggregate(getStageStatistics());
  }

  /**
   * Add a long delay so that you don't get timeouts when working
   * with object stores or debugging.
   * @return a longer timeout than the base classes.
   */
  @Override
  protected int getTestTimeoutMillis() {
    return 600_000;
  }

  protected Path getTestPath() {
    return getContract().getTestPath();
  }

  /**
   * Get the task submitter.
   * @return submitter or null
   */
  protected CloseableTaskPoolSubmitter getSubmitter() {
    return submitter;
  }

  /**
   * Set the task submitter.
   * @param submitter new value.
   */
  protected void setSubmitter(CloseableTaskPoolSubmitter submitter) {
    this.submitter = submitter;
  }

  /**
   * Get the executor which the submitter also uses.
   * @return an executor.
   */
  protected ExecutorService getExecutorService() {
    return getSubmitter().getPool();
  }
  /**
   * @return IOStatistics for stage.
   */
  protected final IOStatisticsStore getStageStatistics() {
    return stageStatistics;
  }

  /**
   * Set the statistics.
   * @param stageStatistics statistics.
   */
  protected final void setStageStatistics(IOStatisticsStore stageStatistics) {
    this.stageStatistics = stageStatistics;
  }

  /**
   * Get the progress counter invoked during commit operations.
   * @return progress counter.
   */
  protected final ProgressCounter getProgressCounter() {
    return progressCounter;
  }

  /**
   * Get the report directory.
   * @return report directory.
   */
  public final File getReportDir() {
    return reportDir;
  }

  /**
   * Get the report directory as a URI.
   * @return report directory.
   */
  public final URI getReportDirUri() {
    return getReportDir().toURI();
  }

  /**
   * Get the (shared) thread leak tracker.
   * @return the thread leak tracker.
   */
  protected static ThreadLeakTracker getThreadLeakTracker() {
    return THREAD_LEAK_TRACKER;
  }

  /**
   * Make sure there's no thread leakage.
   */
  @AfterClass
  public static void threadLeakage() {
    THREAD_LEAK_TRACKER.assertNoThreadLeakage();
  }

  /**
   * Dump the filesystem statistics after the class.
   */
  @AfterClass
  public static void dumpFileSystemIOStatistics() {
    LOG.info("Aggregate FileSystem Statistics {}",
        ioStatisticsToPrettyString(FILESYSTEM_IOSTATS));
  }

  /**
   * Create a directory tree through an executor.
   * dirs created = width^depth;
   * file count = width^depth * files
   * If createDirs == true, then directories are created at the bottom,
   * not files.
   * @param base base dir
   * @param prefix prefix for filenames.
   * @param executor submitter.
   * @param depth depth of dirs
   * @param width width of dirs
   * @param files files to add in each base dir.
   * @param createDirs create directories rather than files?
   * @return the list of paths
   * @throws IOException failure.
   */
  public final List<Path> createFilesOrDirs(Path base,
      String prefix,
      ExecutorService executor,
      int depth,
      int width,
      int files,
      boolean createDirs) throws IOException {

    try (DurationInfo ignored = new DurationInfo(LOG, true,
        "Creating Files %s (%d, %d, %d) under %s",
        prefix, depth, width, files, base)) {

      assertPathExists("Task attempt dir", base);

      // create the files in the thread pool.
      List<Future<Path>> futures = createFilesOrDirs(
          new ArrayList<>(),
          base, prefix,
          executor,
          depth, width, files,
          createDirs);
      List<Path> result = new ArrayList<>();

      // now wait for the creations to finish.
      for (Future<Path> f : futures) {
        result.add(awaitFuture(f));
      }
      return result;
    }
  }

  /**
   * Counter incremented for each file created.
   */
  private final AtomicLong fileDataGenerator = new AtomicLong();

  /**
   * Create files or directories; done in a treewalk and building up
   * a list of futures to wait for. The list is
   * build up incrementally rather than through some merging of
   * lists created down the tree.
   * If createDirs == true, then directories are created at the bottom,
   * not files.
   *
   * @param futures list of futures to build up.
   * @param base base dir
   * @param prefix prefix for filenames.
   * @param executor submitter.
   * @param depth depth of dirs
   * @param width width of dirs
   * @param files files to add in each base dir.
   * @param createDirs create directories rather than files?
   * @return the list of futures
   */
  private List<Future<Path>> createFilesOrDirs(
      List<Future<Path>> futures,
      Path base,
      String prefix,
      ExecutorService executor,
      int depth,
      int width,
      int files,
      boolean createDirs) {

    if (depth > 0) {
      // still creating directories
      for (int i = 0; i < width; i++) {
        Path child = new Path(base,
            String.format("dir-%02d-%02d", depth, i));
        createFilesOrDirs(futures, child, prefix, executor, depth - 1, width, files, false);
      }
    } else {
      // time to create files
      for (int i = 0; i < files; i++) {
        Path file = new Path(base,
            String.format("%s-%04d", prefix,
                CREATE_FILE_COUNTER.incrementAndGet()));
        // buld the data. Not actually used in mkdir.
        long entry = fileDataGenerator.incrementAndGet() & 0xffff;
        byte[] data = new byte[2];
        data[0] = (byte) (entry & 0xff);
        data[1] = (byte) ((entry & 0xff00) >> 8);
        // the async operation.
        Future<Path> f = executor.submit(() -> {
          if (!createDirs) {
            // create files
            ContractTestUtils.createFile(getFileSystem(), file, true, data);
          } else {
            // create directories
            mkdirs(file);
          }
          return file;
        });
        futures.add(f);
      }
    }
    return futures;
  }

  /**
   * Create a list of paths under a dir.
   * @param base base dir
   * @param count count
   * @return the list
   */
  protected List<Path> subpaths(Path base, int count) {
    return IntStream.rangeClosed(1, count)
        .mapToObj(i -> new Path(base, String.format("entry-%02d", i)))
        .collect(Collectors.toList());
  }

  /**
   * Submit a mkdir call to the executor pool.
   * @param path path of dir to create.
   * @return future
   */
  protected CompletableFuture<Path> asyncMkdir(final Path path) {
    CompletableFuture<Path> f = new CompletableFuture<>();
    getExecutorService().submit(() -> {
      try {
        mkdirs(path);
        f.complete(path);
      } catch (IOException e) {
        f.completeExceptionally(e);
      }
    });
    return f;
  }

  /**
   * Given a list of paths, create the dirs async.
   * @param paths path list
   * @throws IOException failure
   */
  protected void asyncMkdirs(Collection<Path> paths) throws IOException {
    List<CompletableFuture<Path>> futures = new ArrayList<>();
    // initiate
    for (Path path: paths) {
      futures.add(asyncMkdir(path));
    }
    // await
    for (Future<Path> f : futures) {
      awaitFuture(f);
    }
  }

  /**
   * Submit an oepration to create a file to the executor pool.
   * @param path path of file to create.
   * @return future
   */
  protected CompletableFuture<Path> asyncPut(final Path path, byte[] data) {
    CompletableFuture<Path> f = new CompletableFuture<>();
    getExecutorService().submit(() -> {
      try {
        ContractTestUtils.createFile(getFileSystem(), path, true, data);
        f.complete(path);
      } catch (IOException e) {
        f.completeExceptionally(e);
      }
    });
    return f;
  }

  /**
   * Convert the manifest list to a map by task attempt ID.
   * @param list manifests
   * @return a map, indexed by task attempt ID.
   */
  protected Map<String, TaskManifest> toMap(List<TaskManifest> list) {
    return list.stream()
        .collect(Collectors.toMap(TaskManifest::getTaskAttemptID, x -> x));
  }

  /**
   * Verify the manifest files match the list of paths.
   * @param manifest manifest to audit
   * @param files list of files.
   */
  protected void verifyManifestFilesMatch(final TaskManifest manifest,
      final List<Path> files) {
    // get the list of source paths
    Set<Path> filesToRename = manifest.getFilesToCommit()
        .stream()
        .map(FileEntry::getSourcePath)
        .collect(Collectors.toSet());
    // which must match that of all the files created
    Assertions.assertThat(filesToRename)
        .containsExactlyInAnyOrderElementsOf(files);
  }

  /**
   * Verify that a task manifest has a given attempt ID.
   * @param manifest manifest, may be null.
   * @param attemptId expected attempt ID
   * @return the manifest, guaranteed to be non-null and of task attempt.
   */
  protected TaskManifest verifyManifestTaskAttemptID(
      final TaskManifest manifest,
      final String attemptId) {
    Assertions.assertThat(manifest)
        .describedAs("Manifest of task %s", attemptId)
        .isNotNull();
    Assertions.assertThat(manifest.getTaskAttemptID())
        .describedAs("Task Attempt ID of manifest %s", manifest)
        .isEqualTo(attemptId);
    return manifest;
  }

  /**
   * Assert that a path must exist; return the path.
   * @param message text for error message.
   * @param path path to validate.
   * @return the path
   * @throws IOException IO Failure
   */
  Path pathMustExist(final String message,
      final Path path) throws IOException {
    assertPathExists(message, path);
    return path;
  }

  /**
   * Assert that a path must exist; return the path.
   * It must also equal the expected value.
   * @param message text for error message.
   * @param expectedPath expected path.
   * @param actualPath path to validate.
   * @return the path
   * @throws IOException IO Failure
   */
  Path verifyPath(final String message,
      final Path expectedPath,
      final Path actualPath) throws IOException {
    Assertions.assertThat(actualPath)
        .describedAs(message)
        .isEqualTo(expectedPath);
    return pathMustExist(message, actualPath);
  }

  /**
   * Verify that the specified dir has the {@code _SUCCESS} marker
   * and that it can be loaded.
   * The contents will be logged and returned.
   * @param dir directory to scan
   * @param jobId job ID, only verified if non-empty
   * @return the loaded success data
   * @throws IOException IO Failure
   */
  protected ManifestSuccessData verifySuccessMarker(Path dir, String jobId)
      throws IOException {
    return validateSuccessFile(getFileSystem(), dir, 0, jobId);
  }

  /**
   * Read a UTF-8 file.
   * @param path path to read
   * @return string value
   * @throws IOException IO failure
   */
  protected String readFile(Path path) throws IOException {
    return ContractTestUtils.readUTF8(getFileSystem(), path, -1);
  }

  /**
   * Modify a (job) config to switch to the manifest committer;
   * output validation is also enabled.
   * @param conf config to patch.
   * @return the updated configuration.
   */
  protected Configuration enableManifestCommitter(final Configuration conf) {
    conf.set(COMMITTER_FACTORY_CLASS, MANIFEST_COMMITTER_FACTORY);
    // always create a job marker
    conf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
    // and validate the output, for extra rigorousness
    conf.setBoolean(OPT_VALIDATE_OUTPUT, true);

    // set the manifest rename dir if non-null
    if (getManifestDir() != null) {
      conf.set(OPT_DIAGNOSTICS_MANIFEST_DIR,
          getManifestDir().toUri().toString());
    }

    // and bind the report dir
    conf.set(OPT_SUMMARY_REPORT_DIR, getReportDirUri().toString());
    return conf;
  }

  /**
   * Create the stage config for a job but don't finalize it.
   * Uses {@link #TASK_IDS} for job/task ID.
   * @param jobAttemptNumber job attempt number
   * @param outputPath path where the final output goes
   * @return the config
   */
  protected StageConfig createStageConfigForJob(
      final int jobAttemptNumber,
      final Path outputPath) {
    return createStageConfig(jobAttemptNumber, -1, 0, outputPath);
  }

  /**
   * Create the stage config for job or task but don't finalize it.
   * Uses {@link #TASK_IDS} for job/task ID.
   * @param jobAttemptNumber job attempt number
   * @param taskIndex task attempt index; -1 for job attempt only.
   * @param taskAttemptNumber task attempt number
   * @param outputPath path where the final output goes
   * @return the config
   */
  protected StageConfig createStageConfig(
      final int jobAttemptNumber,
      final int taskIndex,
      final int taskAttemptNumber,
      final Path outputPath) {
    final String jobId = TASK_IDS.getJobId();
    ManifestCommitterSupport.AttemptDirectories attemptDirs =
        new ManifestCommitterSupport.AttemptDirectories(outputPath,
            jobId, jobAttemptNumber);
    StageConfig config = new StageConfig();
    config
        .withIOProcessors(getSubmitter())
        .withIOStatistics(getStageStatistics())
        .withJobId(jobId)
        .withJobIdSource(JOB_ID_SOURCE_MAPREDUCE)
        .withJobAttemptNumber(jobAttemptNumber)
        .withJobDirectories(attemptDirs)
        .withName(String.format(NAME_FORMAT_JOB_ATTEMPT, jobId))
        .withOperations(getStoreOperations())
        .withProgressable(getProgressCounter());

    // if there's a task attempt ID set, set up its details
    if (taskIndex >= 0) {
      String taskAttempt = TASK_IDS.getTaskAttempt(taskIndex,
          taskAttemptNumber);
      config
          .withTaskAttemptId(taskAttempt)
          .withTaskId(TASK_IDS.getTaskIdType(taskIndex).toString())
          .withTaskAttemptDir(
              attemptDirs.getTaskAttemptPath(taskAttempt));
    }
    return config;
  }

  /**
   * A job stage config.
   * @return stage config or null.
   */
  protected StageConfig getJobStageConfig() {
    return jobStageConfig;
  }

  protected void setJobStageConfig(StageConfig jobStageConfig) {
    this.jobStageConfig = jobStageConfig;
  }

  protected Path getDestDir() {
    return destDir;
  }

  protected void setDestDir(Path destDir) {
    this.destDir = destDir;
  }

  /**
   * Execute a set of tasks; task ID is a simple count.
   * task attempt is lowest 2 bits of task ID.
   * @param taskAttemptCount number of tasks.
   * @param filesPerTaskAttempt number of files to include in manifest.
   * @return the manifests.
   * @throws IOException IO failure.
   */
  protected List<TaskManifest> executeTaskAttempts(int taskAttemptCount,
      int filesPerTaskAttempt) throws IOException {

    try (DurationInfo di = new DurationInfo(LOG, true, "create manifests")) {

      // build a list of the task IDs.
      // it's really hard to create a list of Integers; the java8
      // IntStream etc doesn't quite fit as they do their best
      // keep things unboxed, trying to map(Integer::valueOf) doesn't help.
      List<Integer> taskIdList = new ArrayList<>(taskAttemptCount);
      for (int t = 0; t < taskAttemptCount; t++) {
        taskIdList.add(t);
      }

      /// execute the tasks
      List<TaskManifest> manifests = Collections.synchronizedList(
          new ArrayList<>());

      // then submit their creation/save to the pool.
      TaskPool.foreach(taskIdList)
          .executeWith(getSubmitter())
          .stopOnFailure()
          .run(i -> {
            manifests.add(
                executeOneTaskAttempt(i, i & 0x03, filesPerTaskAttempt));
          });
      return manifests;

    }
  }

  /**
   * Create at task ID and attempt (adding to taskIDs and taskAttemptIds)
   * setup the task, create a manifest with fake task entries
   * and save that manifest to the job attempt dir.
   * No actual files are created.
   * @param task task index
   * @param taskAttempt task attempt value
   * @param filesPerTaskAttempt number of files to include in manifest.
   * @return the manifest
   * @throws IOException failure
   */
  protected TaskManifest executeOneTaskAttempt(final int task,
      int taskAttempt, final int filesPerTaskAttempt) throws IOException {

    String tid = String.format("task_%03d", task);
    String taskAttemptId = String.format("%s_%02d",
        tid, taskAttempt);
    synchronized (this) {
      taskIds.add(tid);
      taskAttemptIds.add(taskAttemptId);
    }
    // for each task, a job config is created then patched with the task info
    StageConfig taskStageConfig = createTaskStageConfig(JOB1, tid, taskAttemptId);

    LOG.info("Generating manifest for {}", taskAttemptId);

    // task setup: create dest dir.
    // This helps generate a realistic
    // workload for the parallelized job cleanup.
    new SetupTaskStage(taskStageConfig).apply("task " + taskAttemptId);

    final TaskManifest manifest = createTaskManifest(taskStageConfig);

    Path taDir = taskStageConfig.getTaskAttemptDir();
    long size = task * 1000_0000L;

    // for each task, 10 dirs, one file per dir.
    for (int i = 0; i < filesPerTaskAttempt; i++) {
      Path in = new Path(taDir, "dir-" + i);
      Path out = new Path(getDestDir(), "dir-" + i);
      manifest.addDirectory(DirEntry.dirEntry(out, 0, 1));
      String name = taskStageConfig.getTaskAttemptId() + ".csv";
      Path src = new Path(in, name);
      Path dest = new Path(out, name);
      long fileSize = size + i * 1000L;
      manifest.addFileToCommit(
          new FileEntry(src, dest, fileSize, Long.toString(fileSize, 16)));
      totalDataSize.addAndGet(fileSize);
    }

    // save the manifest for this stage.
    new SaveTaskManifestStage(taskStageConfig).apply(manifest);
    return manifest;
  }

  public StageConfig createTaskStageConfig(final int jobId, final String tid,
      final String taskAttemptId) {
    Path jobAttemptTaskSubDir = getJobStageConfig().getJobAttemptTaskSubDir();
    StageConfig taskStageConfig = createStageConfigForJob(jobId, getDestDir())
        .withTaskId(tid)
        .withTaskAttemptId(taskAttemptId)
        .withTaskAttemptDir(new Path(jobAttemptTaskSubDir, taskAttemptId));
    return taskStageConfig;
  }

  /**
   * Verify that the job directories have been cleaned up.
   * @throws IOException IO failure
   */
  protected void verifyJobDirsCleanedUp() throws IOException {
    StageConfig stageConfig = getJobStageConfig();
    assertPathDoesNotExist("Job attempt dir", stageConfig.getJobAttemptDir());
    assertPathDoesNotExist("dest temp dir", stageConfig.getOutputTempSubDir());
  }

  /**
   * List a directory/directory tree and print files.
   * @param fileSystem FS
   * @param path path
   * @param recursive do a recursive listing?
   * @return the number of files found.
   * @throws IOException failure.
   */
  public static long lsR(FileSystem fileSystem, Path path, boolean recursive)
      throws Exception {
    if (path == null) {
      // surfaces when someone calls getParent() on something at the top
      // of the path
      LOG.info("Empty path");
      return 0;
    } else {
      LOG.info("Listing of {}", path);
      final long count = RemoteIterators.foreach(
          fileSystem.listFiles(path, recursive),
          (status) -> LOG.info("{}", status));
      LOG.info("Count of entries: {}", count);
      return count;
    }
  }

  /**
   * Assert that a cleanup stage coursehad a given outcome and
   * deleted the given number of directories.
   * @param result result to analyze
   * @param outcome expected outcome
   * @param expectedDirsDeleted #of directories deleted. -1 for no checks
   */
  protected void assertCleanupResult(
      CleanupJobStage.Result result,
      CleanupJobStage.Outcome outcome,
      int expectedDirsDeleted) {
    Assertions.assertThat(result.getOutcome())
        .describedAs("Outcome of cleanup() in %s", result)
        .isEqualTo(outcome);
    if (expectedDirsDeleted >= 0) {
      Assertions.assertThat(result.getDeleteCalls())
          .describedAs("Number of directories deleted in cleanup %s", result)
          .isEqualTo(expectedDirsDeleted);
    }
  }

  /**
   * Create and execute a cleanup stage.
   * @param enabled is the stage enabled?
   * @param deleteTaskAttemptDirsInParallel delete task attempt dirs in
   *        parallel?
   * @param suppressExceptions suppress exceptions?
   * @param outcome expected outcome.
   * @param expectedDirsDeleted #of directories deleted. -1 for no checks
   * @return the result
   * @throws IOException non-suppressed exception
   */
  protected CleanupJobStage.Result cleanup(
      final boolean enabled,
      final boolean deleteTaskAttemptDirsInParallel,
      final boolean suppressExceptions,
      final CleanupJobStage.Outcome outcome,
      final int expectedDirsDeleted) throws IOException {
    StageConfig stageConfig = getJobStageConfig();
    CleanupJobStage.Result result = new CleanupJobStage(stageConfig)
        .apply(new CleanupJobStage.Arguments(OP_STAGE_JOB_CLEANUP,
            enabled, deleteTaskAttemptDirsInParallel, suppressExceptions));
    assertCleanupResult(result, outcome, expectedDirsDeleted);
    return result;
  }

  /**
   * Read the UTF_8 text in the file.
   * @param path path to read
   * @return the string
   * @throws IOException failure
   */
  protected String readText(final Path path) throws IOException {

    final FileSystem fs = getFileSystem();
    final FileStatus st = fs.getFileStatus(path);
    Assertions.assertThat(st.getLen())
        .describedAs("length of file %s", st)
        .isLessThanOrEqualTo(MAX_LEN);

    return new String(
        readDataset(fs, path, (int) st.getLen()),
        StandardCharsets.UTF_8);
  }

  /**
   * Counter.
   */
  protected static final class ProgressCounter implements Progressable {

    private final AtomicLong counter = new AtomicLong();

    /**
     * Increment the counter.
     */
    @Override
    public void progress() {
      counter.incrementAndGet();
    }

    /**
     * Get the counter value.
     * @return the current value.
     */
    public long value() {
      return counter.get();
    }

    /**
     * Reset the counter.
     */
    public void reset() {
      counter.set(0);
    }

    @Override
    public String toString() {
      final StringBuilder sb = new StringBuilder(
          "ProgressCounter{");
      sb.append("counter=").append(counter.get());
      sb.append('}');
      return sb.toString();
    }
  }

  /**
   * Get the progress counter of a stage.
   * @param stageConfig stage
   * @return its progress counter.
   */
  ProgressCounter progressOf(StageConfig stageConfig) {
    return (ProgressCounter) stageConfig.getProgressable();
  }
}
