HADOOP-17023. Tune S3AFileSystem.listStatus() (#2257)


S3AFileSystem.listStatus() is optimized for invocations
where the path supplied is a non-empty directory.
The number of S3 requests is significantly reduced, saving
time, money, and reducing the risk of S3 throttling.

Contributed by Mukund Thakur.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 16413a7..20ed288 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -24,6 +24,7 @@
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -103,6 +104,19 @@
   }
 
   /**
+   * Create a FileStatus iterator against a provided list of file status.
+   * @param fileStatuses array of file status.
+   * @return the file status iterator.
+   */
+  @VisibleForTesting
+  public static ProvidedFileStatusIterator toProvidedFileStatusIterator(
+          S3AFileStatus[] fileStatuses) {
+    return new ProvidedFileStatusIterator(fileStatuses,
+            ACCEPT_ALL,
+            Listing.ACCEPT_ALL_BUT_S3N);
+  }
+
+  /**
    * Create a FileStatus iterator against a path, with a given list object
    * request.
    *
@@ -250,7 +264,7 @@
       if (!forceNonAuthoritativeMS &&
               allowAuthoritative &&
               metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
-        S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
+        S3AFileStatus[] statuses = S3AUtils.iteratorToStatuses(
                 metadataStoreListFilesIterator, tombstones);
         cachedFilesIterator = createProvidedFileStatusIterator(
                 statuses, ACCEPT_ALL, acceptor);
@@ -329,6 +343,56 @@
             tombstones);
   }
 
+  /**
+   * Calculate list of file statuses assuming path
+   * to be a non-empty directory.
+   * @param path input path.
+   * @return Triple of file statuses, metaData, auth flag.
+   * @throws IOException Any IO problems.
+   */
+  public Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
+        getFileStatusesAssumingNonEmptyDir(Path path)
+          throws IOException {
+    String key = pathToKey(path);
+    List<S3AFileStatus> result;
+    if (!key.isEmpty()) {
+      key = key + '/';
+    }
+
+    boolean allowAuthoritative = listingOperationCallbacks
+            .allowAuthoritative(path);
+    DirListingMetadata dirMeta =
+            S3Guard.listChildrenWithTtl(
+                    getStoreContext().getMetadataStore(),
+                    path,
+                    listingOperationCallbacks.getUpdatedTtlTimeProvider(),
+                    allowAuthoritative);
+    // In auth mode return directly with auth flag.
+    if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
+      ProvidedFileStatusIterator mfsItr = createProvidedFileStatusIterator(
+              S3Guard.dirMetaToStatuses(dirMeta),
+              ACCEPT_ALL,
+              Listing.ACCEPT_ALL_BUT_S3N);
+      return Triple.of(mfsItr,
+              dirMeta, Boolean.TRUE);
+    }
+
+    S3ListRequest request = createListObjectsRequest(key, "/");
+    LOG.debug("listStatus: doing listObjects for directory {}", key);
+
+    FileStatusListingIterator filesItr = createFileStatusListingIterator(
+            path,
+            request,
+            ACCEPT_ALL,
+            new Listing.AcceptAllButSelfAndS3nDirs(path));
+
+    // return the results obtained from s3.
+    return Triple.of(
+            filesItr,
+            dirMeta,
+            Boolean.FALSE);
+  }
+
   public S3ListRequest createListObjectsRequest(String key, String delimiter) {
     return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 63c80bd..86f2a88 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -35,6 +35,7 @@
 import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -185,6 +186,7 @@
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
 import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
 import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
+import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 
 /**
@@ -2652,7 +2654,9 @@
    */
   public FileStatus[] listStatus(Path f) throws FileNotFoundException,
       IOException {
-    return once("listStatus", f.toString(), () -> innerListStatus(f));
+    return once("listStatus",
+        f.toString(),
+        () -> iteratorToStatuses(innerListStatus(f), new HashSet<>()));
   }
 
   /**
@@ -2665,51 +2669,52 @@
    * @throws IOException due to an IO problem.
    * @throws AmazonClientException on failures inside the AWS SDK
    */
-  private S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException,
-      IOException, AmazonClientException {
+  private RemoteIterator<S3AFileStatus> innerListStatus(Path f)
+          throws FileNotFoundException,
+          IOException, AmazonClientException {
     Path path = qualify(f);
-    String key = pathToKey(path);
     LOG.debug("List status for path: {}", path);
     entryPoint(INVOCATION_LIST_STATUS);
 
-    List<S3AFileStatus> result;
-    final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
-        StatusProbeEnum.ALL);
+    Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
+            statusesAssumingNonEmptyDir = listing
+            .getFileStatusesAssumingNonEmptyDir(path);
 
-    if (fileStatus.isDirectory()) {
-      if (!key.isEmpty()) {
-        key = key + '/';
+    if (!statusesAssumingNonEmptyDir.getLeft().hasNext() &&
+            statusesAssumingNonEmptyDir.getRight()) {
+      // We are sure that this is an empty directory in auth mode.
+      return statusesAssumingNonEmptyDir.getLeft();
+    } else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) {
+      // We may have an empty dir, or may have file or may have nothing.
+      // So we call innerGetFileStatus to get the status, this may throw
+      // FileNotFoundException if we have nothing.
+      // So We are guaranteed to have either a dir marker or a file.
+      final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
+              StatusProbeEnum.ALL);
+      // If it is a file return directly.
+      if (fileStatus.isFile()) {
+        LOG.debug("Adding: rd (not a dir): {}", path);
+        S3AFileStatus[] stats = new S3AFileStatus[1];
+        stats[0] = fileStatus;
+        return listing.createProvidedFileStatusIterator(
+                stats,
+                ACCEPT_ALL,
+                Listing.ACCEPT_ALL_BUT_S3N);
       }
-
-      boolean allowAuthoritative = allowAuthoritative(f);
-      DirListingMetadata dirMeta =
-          S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
-              allowAuthoritative);
-      if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
-        return S3Guard.dirMetaToStatuses(dirMeta);
-      }
-
-      S3ListRequest request = createListObjectsRequest(key, "/");
-      LOG.debug("listStatus: doing listObjects for directory {}", key);
-
-      Listing.FileStatusListingIterator files =
-          listing.createFileStatusListingIterator(path,
-              request,
-              ACCEPT_ALL,
-              new Listing.AcceptAllButSelfAndS3nDirs(path));
-      result = new ArrayList<>(files.getBatchSize());
-      while (files.hasNext()) {
-        result.add(files.next());
-      }
-      // merge the results. This will update the store as needed
-      return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
-          allowAuthoritative, ttlTimeProvider);
-    } else {
-      LOG.debug("Adding: rd (not a dir): {}", path);
-      S3AFileStatus[] stats = new S3AFileStatus[1];
-      stats[0]= fileStatus;
-      return stats;
     }
+    // Here we have a directory which may or may not be empty.
+    // So we update the metastore and return.
+    return S3Guard.dirListingUnion(
+            metadataStore,
+            path,
+            statusesAssumingNonEmptyDir.getLeft(),
+            statusesAssumingNonEmptyDir.getMiddle(),
+            allowAuthoritative(path),
+            ttlTimeProvider, p ->
+                    listing.createProvidedFileStatusIterator(
+                    dirMetaToStatuses(statusesAssumingNonEmptyDir.getMiddle()),
+                    ACCEPT_ALL,
+                    Listing.ACCEPT_ALL_BUT_S3N));
   }
 
   /**
@@ -4497,7 +4502,7 @@
             : null;
     final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
         listing.createProvidedFileStatusIterator(
-            S3Guard.dirMetaToStatuses(meta), filter, acceptor);
+            dirMetaToStatuses(meta), filter, acceptor);
     return (allowAuthoritative && meta != null
         && meta.isAuthoritative())
         ? listing.createLocatedFileStatusIterator(
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 1d39950..3e9115c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -42,6 +42,7 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -1417,6 +1418,30 @@
   }
 
   /**
+   * Convert the data of an iterator of {@link S3AFileStatus} to
+   * an array. Given tombstones are filtered out. If the iterator
+   * does return any item, an empty array is returned.
+   * @param iterator a non-null iterator
+   * @param tombstones
+   * @return a possibly-empty array of file status entries
+   * @throws IOException
+   */
+  public static S3AFileStatus[] iteratorToStatuses(
+      RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
+      throws IOException {
+    List<FileStatus> statuses = new ArrayList<>();
+
+    while (iterator.hasNext()) {
+      S3AFileStatus status = iterator.next();
+      if (!tombstones.contains(status.getPath())) {
+        statuses.add(status);
+      }
+    }
+
+    return statuses.toArray(new S3AFileStatus[0]);
+  }
+
+  /**
    * An interface for use in lambda-expressions working with
    * directory tree listings.
    */
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index ae5c293..78cedc2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -296,30 +296,6 @@
   }
 
   /**
-   * Convert the data of an iterator of {@link S3AFileStatus} to
-   * an array. Given tombstones are filtered out. If the iterator
-   * does return any item, an empty array is returned.
-   * @param iterator a non-null iterator
-   * @param tombstones
-   * @return a possibly-empty array of file status entries
-   * @throws IOException
-   */
-  public static S3AFileStatus[] iteratorToStatuses(
-      RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
-      throws IOException {
-    List<FileStatus> statuses = new ArrayList<>();
-
-    while (iterator.hasNext()) {
-      S3AFileStatus status = iterator.next();
-      if (!tombstones.contains(status.getPath())) {
-        statuses.add(status);
-      }
-    }
-
-    return statuses.toArray(new S3AFileStatus[0]);
-  }
-
-  /**
    * Convert the data of a directory listing to an array of {@link FileStatus}
    * entries. Tombstones are filtered out at this point. If the listing is null
    * an empty array is returned.
@@ -359,17 +335,22 @@
    * @param dirMeta  Directory listing from MetadataStore.  May be null.
    * @param isAuthoritative State of authoritative mode
    * @param timeProvider Time provider to use when updating entries
+   * @param toStatusItr function to convert array of file status to
+   *                    RemoteIterator.
    * @return Final result of directory listing.
    * @throws IOException if metadata store update failed
    */
-  public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
-      List<S3AFileStatus> backingStatuses, DirListingMetadata dirMeta,
-      boolean isAuthoritative, ITtlTimeProvider timeProvider)
-      throws IOException {
+  public static RemoteIterator<S3AFileStatus> dirListingUnion(
+          MetadataStore ms, Path path,
+          RemoteIterator<S3AFileStatus> backingStatuses,
+          DirListingMetadata dirMeta, boolean isAuthoritative,
+          ITtlTimeProvider timeProvider,
+          Function<S3AFileStatus[], RemoteIterator<S3AFileStatus>> toStatusItr)
+          throws IOException {
 
     // Fast-path for NullMetadataStore
     if (isNullMetadataStore(ms)) {
-      return backingStatuses.toArray(new S3AFileStatus[backingStatuses.size()]);
+      return backingStatuses;
     }
 
     assertQualified(path);
@@ -410,7 +391,7 @@
     }
     IOUtils.cleanupWithLogger(LOG, operationState);
 
-    return dirMetaToStatuses(dirMeta);
+    return toStatusItr.apply(dirMetaToStatuses(dirMeta));
   }
 
   /**
@@ -429,7 +410,7 @@
   private static void authoritativeUnion(
       final MetadataStore ms,
       final Path path,
-      final List<S3AFileStatus> backingStatuses,
+      final RemoteIterator<S3AFileStatus> backingStatuses,
       final DirListingMetadata dirMeta,
       final ITtlTimeProvider timeProvider,
       final BulkOperationState operationState) throws IOException {
@@ -440,7 +421,8 @@
     Set<Path> deleted = dirMeta.listTombstones();
     final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
         .collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
-    for (S3AFileStatus s : backingStatuses) {
+    while (backingStatuses.hasNext()) {
+      S3AFileStatus s = backingStatuses.next();
       final Path statusPath = s.getPath();
       if (deleted.contains(statusPath)) {
         continue;
@@ -493,16 +475,17 @@
   private static void nonAuthoritativeUnion(
       final MetadataStore ms,
       final Path path,
-      final List<S3AFileStatus> backingStatuses,
+      final RemoteIterator<S3AFileStatus> backingStatuses,
       final DirListingMetadata dirMeta,
       final ITtlTimeProvider timeProvider,
       final BulkOperationState operationState) throws IOException {
-    List<PathMetadata> entriesToAdd = new ArrayList<>(backingStatuses.size());
+    List<PathMetadata> entriesToAdd = new ArrayList<>();
     Set<Path> deleted = dirMeta.listTombstones();
 
     final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
         .collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
-    for (S3AFileStatus s : backingStatuses) {
+    while (backingStatuses.hasNext()) {
+      S3AFileStatus s = backingStatuses.next();
       final Path statusPath = s.getPath();
       if (deleted.contains(statusPath)) {
         continue;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
index 46e6f5f..941e701 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
@@ -177,6 +177,49 @@
   }
 
   @Test
+  public void testCostOfListStatusOnFile() throws Throwable {
+    describe("Performing listStatus() on a file");
+    Path file = path(getMethodName() + ".txt");
+    S3AFileSystem fs = getFileSystem();
+    touch(fs, file);
+    verifyMetrics(() ->
+            fs.listStatus(file),
+            whenRaw(LIST_STATUS_LIST_OP
+                    .plus(GET_FILE_STATUS_ON_FILE)),
+            whenAuthoritative(LIST_STATUS_LIST_OP),
+            whenNonauth(LIST_STATUS_LIST_OP));
+  }
+
+  @Test
+  public void testCostOfListStatusOnEmptyDir() throws Throwable {
+    describe("Performing listStatus() on an empty dir");
+    Path dir = path(getMethodName());
+    S3AFileSystem fs = getFileSystem();
+    fs.mkdirs(dir);
+    verifyMetrics(() ->
+            fs.listStatus(dir),
+            whenRaw(LIST_STATUS_LIST_OP
+            .plus(GET_FILE_STATUS_ON_EMPTY_DIR)),
+            whenAuthoritative(NO_IO),
+            whenNonauth(LIST_STATUS_LIST_OP));
+  }
+
+  @Test
+  public void testCostOfListStatusOnNonEmptyDir() throws Throwable {
+    describe("Performing listStatus() on a non empty dir");
+    Path dir = path(getMethodName());
+    S3AFileSystem fs = getFileSystem();
+    fs.mkdirs(dir);
+    Path file = new Path(dir, "file.txt");
+    touch(fs, file);
+    verifyMetrics(() ->
+            fs.listStatus(dir),
+            whenRaw(LIST_STATUS_LIST_OP),
+            whenAuthoritative(NO_IO),
+            whenNonauth(LIST_STATUS_LIST_OP));
+  }
+
+  @Test
   public void testCostOfGetFileStatusOnFile() throws Throwable {
     describe("performing getFileStatus on a file");
     Path simpleFile = file(methodPath());
@@ -406,8 +449,7 @@
     fs.globStatus(basePath.suffix("/*"));
     // 2 head + 1 list from getFileStatus on path,
     // plus 1 list to match the glob pattern
-    verifyRaw(GET_FILE_STATUS_ON_DIR
-        .plus(LIST_OPERATION),
+    verifyRaw(LIST_STATUS_LIST_OP,
         () -> fs.globStatus(basePath.suffix("/*")));
   }
 
@@ -426,8 +468,7 @@
     // unguarded: 2 head + 1 list from getFileStatus on path,
     // plus 1 list to match the glob pattern
     // no additional operations from symlink resolution
-    verifyRaw(GET_FILE_STATUS_ON_DIR
-        .plus(LIST_OPERATION),
+    verifyRaw(LIST_STATUS_LIST_OP,
         () -> fs.globStatus(basePath.suffix("/*")));
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index f225800..f057449 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -35,18 +35,39 @@
 import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
+import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
+import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
+import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.MultiObjectDeleteException;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
+import javax.annotation.Nullable;
 import org.hamcrest.core.Is;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -61,12 +82,15 @@
 import java.net.URISyntaxException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -895,6 +919,49 @@
   }
 
   /**
+   * Create mock implementation of store context.
+   * @param multiDelete
+   * @param store
+   * @param accessors
+   * @return
+   * @throws URISyntaxException
+   * @throws IOException
+   */
+  public static StoreContext createMockStoreContext(
+          boolean multiDelete,
+          OperationTrackingStore store,
+          ContextAccessors accessors)
+          throws URISyntaxException, IOException {
+    URI name = new URI("s3a://bucket");
+    Configuration conf = new Configuration();
+    return new StoreContextBuilder().setFsURI(name)
+        .setBucket("bucket")
+        .setConfiguration(conf)
+        .setUsername("alice")
+        .setOwner(UserGroupInformation.getCurrentUser())
+        .setExecutor(BlockingThreadPoolExecutorService.newInstance(
+            4,
+            4,
+            10, TimeUnit.SECONDS,
+            "s3a-transfer-shared"))
+        .setExecutorCapacity(DEFAULT_EXECUTOR_CAPACITY)
+        .setInvoker(
+            new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
+        .setInstrumentation(new S3AInstrumentation(name))
+        .setStorageStatistics(new S3AStorageStatistics())
+        .setInputPolicy(S3AInputPolicy.Normal)
+        .setChangeDetectionPolicy(
+            ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
+                ChangeDetectionPolicy.Source.ETag, false))
+        .setMultiObjectDeleteEnabled(multiDelete)
+        .setMetadataStore(store)
+        .setUseListV1(false)
+        .setContextAccessors(accessors)
+        .setTimeProvider(new S3Guard.TtlTimeProvider(conf))
+        .build();
+  }
+
+  /**
    * Helper class to do diffs of metrics.
    */
   public static final class MetricDiff {
@@ -1472,4 +1539,293 @@
         needEmptyDirectoryFlag,
         probes);
   }
+
+  public static class MinimalOperationCallbacks
+          implements OperationCallbacks {
+    @Override
+    public S3ObjectAttributes createObjectAttributes(
+            Path path,
+            String eTag,
+            String versionId,
+            long len) {
+      return null;
+    }
+
+    @Override
+    public S3ObjectAttributes createObjectAttributes(
+            S3AFileStatus fileStatus) {
+      return null;
+    }
+
+    @Override
+    public S3AReadOpContext createReadContext(
+            FileStatus fileStatus) {
+      return null;
+    }
+
+    @Override
+    public void finishRename(
+            Path sourceRenamed,
+            Path destCreated)
+            throws IOException {
+
+    }
+
+    @Override
+    public void deleteObjectAtPath(
+            Path path,
+            String key,
+            boolean isFile,
+            BulkOperationState operationState)
+            throws IOException {
+
+    }
+
+    @Override
+    public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
+            Path path,
+            S3AFileStatus status,
+            boolean collectTombstones,
+            boolean includeSelf)
+            throws IOException {
+      return null;
+    }
+
+    @Override
+    public CopyResult copyFile(
+            String srcKey,
+            String destKey,
+            S3ObjectAttributes srcAttributes,
+            S3AReadOpContext readContext)
+            throws IOException {
+      return null;
+    }
+
+    @Override
+    public DeleteObjectsResult removeKeys(
+            List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+            boolean deleteFakeDir,
+            List<Path> undeletedObjectsOnFailure,
+            BulkOperationState operationState,
+            boolean quiet)
+            throws MultiObjectDeleteException, AmazonClientException,
+            IOException {
+      return null;
+    }
+
+    @Override
+    public boolean allowAuthoritative(Path p) {
+      return false;
+    }
+
+    @Override
+    public RemoteIterator<S3AFileStatus> listObjects(
+            Path path,
+            String key)
+            throws IOException {
+      return null;
+    }
+  }
+
+  /**
+   * MetadataStore which tracks what is deleted and added.
+   */
+  public static class OperationTrackingStore implements MetadataStore {
+
+    private final List<Path> deleted = new ArrayList<>();
+
+    private final List<Path> created = new ArrayList<>();
+
+    @Override
+    public void initialize(final FileSystem fs,
+        ITtlTimeProvider ttlTimeProvider) {
+    }
+
+    @Override
+    public void initialize(final Configuration conf,
+        ITtlTimeProvider ttlTimeProvider) {
+    }
+
+    @Override
+    public void forgetMetadata(final Path path) {
+    }
+
+    @Override
+    public PathMetadata get(final Path path) {
+      return null;
+    }
+
+    @Override
+    public PathMetadata get(final Path path,
+        final boolean wantEmptyDirectoryFlag) {
+      return null;
+    }
+
+    @Override
+    public DirListingMetadata listChildren(final Path path) {
+      return null;
+    }
+
+    @Override
+    public void put(final PathMetadata meta) {
+      put(meta, null);
+    }
+
+    @Override
+    public void put(final PathMetadata meta,
+        final BulkOperationState operationState) {
+      created.add(meta.getFileStatus().getPath());
+    }
+
+    @Override
+    public void put(final Collection<? extends PathMetadata> metas,
+        final BulkOperationState operationState) {
+      metas.stream().forEach(meta -> put(meta, null));
+    }
+
+    @Override
+    public void put(final DirListingMetadata meta,
+        final List<Path> unchangedEntries,
+        final BulkOperationState operationState) {
+      created.add(meta.getPath());
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+    @Override
+    public void delete(final Path path,
+        final BulkOperationState operationState) {
+      deleted.add(path);
+    }
+
+    @Override
+    public void deletePaths(final Collection<Path> paths,
+        @Nullable final BulkOperationState operationState)
+            throws IOException {
+      deleted.addAll(paths);
+    }
+
+    @Override
+    public void deleteSubtree(final Path path,
+        final BulkOperationState operationState) {
+
+    }
+
+    @Override
+    public void move(@Nullable final Collection<Path> pathsToDelete,
+        @Nullable final Collection<PathMetadata> pathsToCreate,
+        @Nullable final BulkOperationState operationState) {
+    }
+
+    @Override
+    public void prune(final PruneMode pruneMode, final long cutoff) {
+    }
+
+    @Override
+    public long prune(final PruneMode pruneMode,
+        final long cutoff,
+        final String keyPrefix) {
+      return 0;
+    }
+
+    @Override
+    public BulkOperationState initiateBulkWrite(
+        final BulkOperationState.OperationType operation,
+        final Path dest) {
+      return null;
+    }
+
+    @Override
+    public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
+    }
+
+    @Override
+    public Map<String, String> getDiagnostics() {
+      return null;
+    }
+
+    @Override
+    public void updateParameters(final Map<String, String> parameters) {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public List<Path> getDeleted() {
+      return deleted;
+    }
+
+    public List<Path> getCreated() {
+      return created;
+    }
+
+    @Override
+    public RenameTracker initiateRenameOperation(
+        final StoreContext storeContext,
+        final Path source,
+        final S3AFileStatus sourceStatus,
+        final Path dest) {
+      throw new UnsupportedOperationException("unsupported");
+    }
+
+    @Override
+    public void addAncestors(final Path qualifiedPath,
+        @Nullable final BulkOperationState operationState) {
+
+    }
+  }
+
+  public static class MinimalListingOperationCallbacks
+          implements ListingOperationCallbacks {
+    @Override
+    public CompletableFuture<S3ListResult> listObjectsAsync(
+            S3ListRequest request)
+            throws IOException {
+      return null;
+    }
+
+    @Override
+    public CompletableFuture<S3ListResult> continueListObjectsAsync(
+            S3ListRequest request,
+            S3ListResult prevResult)
+            throws IOException {
+      return null;
+    }
+
+    @Override
+    public S3ALocatedFileStatus toLocatedFileStatus(
+            S3AFileStatus status) throws IOException {
+      return null;
+    }
+
+    @Override
+    public S3ListRequest createListObjectsRequest(
+            String key,
+            String delimiter) {
+      return null;
+    }
+
+    @Override
+    public long getDefaultBlockSize(Path path) {
+      return 0;
+    }
+
+    @Override
+    public int getMaxKeys() {
+      return 0;
+    }
+
+    @Override
+    public ITtlTimeProvider getUpdatedTtlTimeProvider() {
+      return null;
+    }
+
+    @Override
+    public boolean allowAuthoritative(Path p) {
+      return false;
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
index 0729f2a..cdf7927 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
@@ -18,26 +18,15 @@
 
 package org.apache.hadoop.fs.s3a.impl;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
 import com.amazonaws.services.s3.model.MultiObjectDeleteException;
-import com.amazonaws.services.s3.transfer.model.CopyResult;
 import com.google.common.collect.Lists;
 import org.assertj.core.api.Assertions;
 import org.junit.Before;
@@ -45,32 +34,8 @@
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.fs.s3a.Invoker;
-import org.apache.hadoop.fs.s3a.S3AFileStatus;
-import org.apache.hadoop.fs.s3a.S3AInputPolicy;
-import org.apache.hadoop.fs.s3a.S3AInstrumentation;
-import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
-import org.apache.hadoop.fs.s3a.S3AReadOpContext;
-import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
-import org.apache.hadoop.fs.s3a.S3ListRequest;
-import org.apache.hadoop.fs.s3a.S3ListResult;
-import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
-import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
-import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
-import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
-import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
-import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
-import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
-import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
 import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.ACCESS_DENIED;
 import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
@@ -93,8 +58,8 @@
 
   @Before
   public void setUp() throws Exception {
-    context = createMockStoreContext(true,
-        new OperationTrackingStore());
+    context = S3ATestUtils.createMockStoreContext(true,
+        new S3ATestUtils.OperationTrackingStore(), CONTEXT_ACCESSORS);
   }
 
   @Test
@@ -187,9 +152,10 @@
     final List<Path> deleteAllowed = Lists.newArrayList(pathA, pathAC);
     MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
         deleteForbidden);
-    OperationTrackingStore store
-        = new OperationTrackingStore();
-    StoreContext storeContext = createMockStoreContext(true, store);
+    S3ATestUtils.OperationTrackingStore store
+        = new S3ATestUtils.OperationTrackingStore();
+    StoreContext storeContext = S3ATestUtils
+            .createMockStoreContext(true, store, CONTEXT_ACCESSORS);
     MultiObjectDeleteSupport deleteSupport
         = new MultiObjectDeleteSupport(storeContext, null);
     Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
@@ -210,174 +176,6 @@
   }
 
 
-  private StoreContext createMockStoreContext(boolean multiDelete,
-      OperationTrackingStore store) throws URISyntaxException, IOException {
-    URI name = new URI("s3a://bucket");
-    Configuration conf = new Configuration();
-    return new StoreContextBuilder().setFsURI(name)
-        .setBucket("bucket")
-        .setConfiguration(conf)
-        .setUsername("alice")
-        .setOwner(UserGroupInformation.getCurrentUser())
-        .setExecutor(BlockingThreadPoolExecutorService.newInstance(
-            4,
-            4,
-            10, TimeUnit.SECONDS,
-            "s3a-transfer-shared"))
-        .setExecutorCapacity(Constants.DEFAULT_EXECUTOR_CAPACITY)
-        .setInvoker(
-            new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
-        .setInstrumentation(new S3AInstrumentation(name))
-        .setStorageStatistics(new S3AStorageStatistics())
-        .setInputPolicy(S3AInputPolicy.Normal)
-        .setChangeDetectionPolicy(
-            ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
-                ChangeDetectionPolicy.Source.ETag, false))
-        .setMultiObjectDeleteEnabled(multiDelete)
-        .setMetadataStore(store)
-        .setUseListV1(false)
-        .setContextAccessors(CONTEXT_ACCESSORS)
-        .setTimeProvider(new S3Guard.TtlTimeProvider(conf))
-        .build();
-  }
-
-  private static class MinimalListingOperationCallbacks
-          implements ListingOperationCallbacks {
-    @Override
-    public CompletableFuture<S3ListResult> listObjectsAsync(
-            S3ListRequest request)
-            throws IOException {
-      return null;
-    }
-
-    @Override
-    public CompletableFuture<S3ListResult> continueListObjectsAsync(
-            S3ListRequest request,
-            S3ListResult prevResult)
-            throws IOException {
-      return null;
-    }
-
-    @Override
-    public S3ALocatedFileStatus toLocatedFileStatus(
-            S3AFileStatus status) throws IOException {
-      return null;
-    }
-
-    @Override
-    public S3ListRequest createListObjectsRequest(
-            String key,
-            String delimiter) {
-      return null;
-    }
-
-    @Override
-    public long getDefaultBlockSize(Path path) {
-      return 0;
-    }
-
-    @Override
-    public int getMaxKeys() {
-      return 0;
-    }
-
-    @Override
-    public ITtlTimeProvider getUpdatedTtlTimeProvider() {
-      return null;
-    }
-
-    @Override
-    public boolean allowAuthoritative(Path p) {
-      return false;
-    }
-  }
-  private static class MinimalOperationCallbacks
-          implements OperationCallbacks {
-    @Override
-    public S3ObjectAttributes createObjectAttributes(
-            Path path,
-            String eTag,
-            String versionId,
-            long len) {
-      return null;
-    }
-
-    @Override
-    public S3ObjectAttributes createObjectAttributes(
-            S3AFileStatus fileStatus) {
-      return null;
-    }
-
-    @Override
-    public S3AReadOpContext createReadContext(
-            FileStatus fileStatus) {
-      return null;
-    }
-
-    @Override
-    public void finishRename(
-            Path sourceRenamed,
-            Path destCreated)
-            throws IOException {
-
-    }
-
-    @Override
-    public void deleteObjectAtPath(
-            Path path,
-            String key,
-            boolean isFile,
-            BulkOperationState operationState)
-            throws IOException {
-
-    }
-
-    @Override
-    public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
-            Path path,
-            S3AFileStatus status,
-            boolean collectTombstones,
-            boolean includeSelf)
-            throws IOException {
-      return null;
-    }
-
-    @Override
-    public CopyResult copyFile(
-            String srcKey,
-            String destKey,
-            S3ObjectAttributes srcAttributes,
-            S3AReadOpContext readContext)
-            throws IOException {
-      return null;
-    }
-
-    @Override
-    public DeleteObjectsResult removeKeys(
-            List<DeleteObjectsRequest.KeyVersion> keysToDelete,
-            boolean deleteFakeDir,
-            List<Path> undeletedObjectsOnFailure,
-            BulkOperationState operationState,
-            boolean quiet)
-            throws MultiObjectDeleteException, AmazonClientException,
-            IOException {
-      return null;
-    }
-
-    @Override
-    public boolean allowAuthoritative(Path p) {
-      return false;
-    }
-
-    @Override
-    public RemoteIterator<S3AFileStatus> listObjects(
-            Path path,
-            String key)
-            throws IOException {
-      return null;
-    }
-  }
-
   private static class MinimalContextAccessor implements ContextAccessors {
 
     @Override
@@ -406,155 +204,5 @@
       return path;
     }
   }
-  /**
-   * MetadataStore which tracks what is deleted and added.
-   */
-  private static class OperationTrackingStore implements MetadataStore {
-
-    private final List<Path> deleted = new ArrayList<>();
-
-    private final List<Path> created = new ArrayList<>();
-
-    @Override
-    public void initialize(final FileSystem fs,
-        ITtlTimeProvider ttlTimeProvider) {
-    }
-
-    @Override
-    public void initialize(final Configuration conf,
-        ITtlTimeProvider ttlTimeProvider) {
-    }
-
-    @Override
-    public void forgetMetadata(final Path path) {
-    }
-
-    @Override
-    public PathMetadata get(final Path path) {
-      return null;
-    }
-
-    @Override
-    public PathMetadata get(final Path path,
-        final boolean wantEmptyDirectoryFlag) {
-      return null;
-    }
-
-    @Override
-    public DirListingMetadata listChildren(final Path path) {
-      return null;
-    }
-
-    @Override
-    public void put(final PathMetadata meta) {
-      put(meta, null);
-    }
-
-    @Override
-    public void put(final PathMetadata meta,
-        final BulkOperationState operationState) {
-      created.add(meta.getFileStatus().getPath());
-    }
-
-    @Override
-    public void put(final Collection<? extends PathMetadata> metas,
-        final BulkOperationState operationState) {
-      metas.stream().forEach(meta -> put(meta, null));
-    }
-
-    @Override
-    public void put(final DirListingMetadata meta,
-        final List<Path> unchangedEntries,
-        final BulkOperationState operationState) {
-      created.add(meta.getPath());
-    }
-
-    @Override
-    public void destroy() {
-    }
-
-    @Override
-    public void delete(final Path path,
-        final BulkOperationState operationState) {
-      deleted.add(path);
-    }
-
-    @Override
-    public void deletePaths(final Collection<Path> paths,
-        @Nullable final BulkOperationState operationState)
-            throws IOException {
-      deleted.addAll(paths);
-    }
-
-    @Override
-    public void deleteSubtree(final Path path,
-        final BulkOperationState operationState) {
-
-    }
-
-    @Override
-    public void move(@Nullable final Collection<Path> pathsToDelete,
-        @Nullable final Collection<PathMetadata> pathsToCreate,
-        @Nullable final BulkOperationState operationState) {
-    }
-
-    @Override
-    public void prune(final PruneMode pruneMode, final long cutoff) {
-    }
-
-    @Override
-    public long prune(final PruneMode pruneMode,
-        final long cutoff,
-        final String keyPrefix) {
-      return 0;
-    }
-
-    @Override
-    public BulkOperationState initiateBulkWrite(
-        final BulkOperationState.OperationType operation,
-        final Path dest) {
-      return null;
-    }
-
-    @Override
-    public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
-    }
-
-    @Override
-    public Map<String, String> getDiagnostics() {
-      return null;
-    }
-
-    @Override
-    public void updateParameters(final Map<String, String> parameters) {
-    }
-
-    @Override
-    public void close() {
-    }
-
-    public List<Path> getDeleted() {
-      return deleted;
-    }
-
-    public List<Path> getCreated() {
-      return created;
-    }
-
-    @Override
-    public RenameTracker initiateRenameOperation(
-        final StoreContext storeContext,
-        final Path source,
-        final S3AFileStatus sourceStatus,
-        final Path dest) {
-      throw new UnsupportedOperationException("unsupported");
-    }
-
-    @Override
-    public void addAncestors(final Path qualifiedPath,
-        @Nullable final BulkOperationState operationState) {
-
-    }
-  }
 
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
index 46a6b71..54b6866 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
@@ -107,9 +107,10 @@
       new OperationCost(0, 1);
 
   /** listFiles always does a LIST. */
-  public static final OperationCost LIST_FILES_LIST_OP =
-      new OperationCost(0, 1);
+  public static final OperationCost LIST_FILES_LIST_OP = LIST_OPERATION;
 
+  /** listStatus always does a LIST. */
+  public static final OperationCost LIST_STATUS_LIST_OP = LIST_OPERATION;
   /**
    * Metadata cost of a copy operation, as used during rename.
    * This happens even if the store is guarded.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
index 672f3a9..eaa363b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
@@ -23,6 +23,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.UUID;
@@ -39,7 +40,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.Tristate;
 import org.apache.hadoop.service.launcher.LauncherExitCodes;
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -47,6 +50,8 @@
 
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
 import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
+import static org.apache.hadoop.fs.s3a.Listing.toProvidedFileStatusIterator;
+import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -79,7 +84,6 @@
     ms.initialize(conf, new S3Guard.TtlTimeProvider(conf));
     timeProvider = new S3Guard.TtlTimeProvider(
         DEFAULT_METADATASTORE_METADATA_TTL);
-
   }
 
   @After
@@ -108,9 +112,14 @@
     List<S3AFileStatus> s3Listing = Arrays.asList(
         s1Status,
         s2Status);
-
-    FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing,
-        dirMeta, false, timeProvider);
+    RemoteIterator<S3AFileStatus> storeItr = toProvidedFileStatusIterator(
+            s3Listing.toArray(new S3AFileStatus[0]));
+    RemoteIterator<S3AFileStatus> resultItr = S3Guard.dirListingUnion(
+            ms, DIR_PATH, storeItr, dirMeta, false,
+            timeProvider, s3AFileStatuses ->
+                    toProvidedFileStatusIterator(dirMetaToStatuses(dirMeta)));
+    S3AFileStatus[] result = S3AUtils.iteratorToStatuses(
+            resultItr, new HashSet<>());
 
     assertEquals("listing length", 4, result.length);
     assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
@@ -124,9 +133,18 @@
     S3AFileStatus f1Status2 = new S3AFileStatus(
         200, System.currentTimeMillis(), new Path(MS_FILE_1),
         1, null, "tag2", "ver2");
-    FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH,
-        Arrays.asList(f1Status2),
-        dirMeta, false, timeProvider);
+    S3AFileStatus[] f1Statuses = new S3AFileStatus[1];
+    f1Statuses[0] = f1Status2;
+    RemoteIterator<S3AFileStatus> itr = toProvidedFileStatusIterator(
+            f1Statuses);
+    FileStatus[] result2 = S3AUtils.iteratorToStatuses(
+            S3Guard.dirListingUnion(
+                    ms, DIR_PATH, itr, dirMeta,
+                    false, timeProvider,
+              s3AFileStatuses ->
+                      toProvidedFileStatusIterator(
+                              dirMetaToStatuses(dirMeta))),
+            new HashSet<>());
     // the listing returns the new status
     Assertions.assertThat(find(result2, MS_FILE_1))
         .describedAs("Entry in listing results for %s", MS_FILE_1)
@@ -159,9 +177,18 @@
 
     ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
         DEFAULT_METADATASTORE_METADATA_TTL);
-    FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing,
-        dirMeta, true, timeProvider);
 
+    RemoteIterator<S3AFileStatus> storeItr = toProvidedFileStatusIterator(
+            s3Listing.toArray(new S3AFileStatus[0]));
+    RemoteIterator<S3AFileStatus> resultItr = S3Guard
+            .dirListingUnion(ms, DIR_PATH, storeItr, dirMeta,
+                    true, timeProvider,
+              s3AFileStatuses ->
+                      toProvidedFileStatusIterator(
+                              dirMetaToStatuses(dirMeta)));
+
+    S3AFileStatus[] result = S3AUtils.iteratorToStatuses(
+            resultItr, new HashSet<>());
     assertEquals("listing length", 4, result.length);
     assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
 
@@ -181,13 +208,21 @@
     S3AFileStatus s1Status2 = new S3AFileStatus(
         200, System.currentTimeMillis(), new Path(S3_FILE_3),
         1, null, "tag2", "ver2");
+    S3AFileStatus[] f1Statuses = new S3AFileStatus[1];
+    f1Statuses[0] = s1Status2;
+    RemoteIterator<S3AFileStatus> itr =
+            toProvidedFileStatusIterator(f1Statuses);
+    FileStatus[] result2 = S3AUtils.iteratorToStatuses(
+            S3Guard.dirListingUnion(ms, DIR_PATH, itr, dirMeta,
+                    true, timeProvider,
+              s3AFileStatuses ->
+                      toProvidedFileStatusIterator(
+                              dirMetaToStatuses(dirMeta))),
+            new HashSet<>());
 
     // but the result of the listing contains the old entry
     // because auth mode doesn't pick up changes in S3 which
     // didn't go through s3guard
-    FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH,
-        Arrays.asList(s1Status2),
-        dirMeta2, true, timeProvider);
     Assertions.assertThat(find(result2, S3_FILE_3))
         .describedAs("Entry in listing results for %s", S3_FILE_3)
         .isSameAs(file3Meta.getFileStatus());