HADOOP-17023. Tune S3AFileSystem.listStatus() (#2257)
S3AFileSystem.listStatus() is optimized for invocations
where the path supplied is a non-empty directory.
The number of S3 requests is significantly reduced, saving
time, money, and reducing the risk of S3 throttling.
Contributed by Mukund Thakur.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 16413a7..20ed288 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -24,6 +24,7 @@
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -103,6 +104,19 @@
}
/**
+ * Create a FileStatus iterator against a provided list of file status.
+ * @param fileStatuses array of file status.
+ * @return the file status iterator.
+ */
+ @VisibleForTesting
+ public static ProvidedFileStatusIterator toProvidedFileStatusIterator(
+ S3AFileStatus[] fileStatuses) {
+ return new ProvidedFileStatusIterator(fileStatuses,
+ ACCEPT_ALL,
+ Listing.ACCEPT_ALL_BUT_S3N);
+ }
+
+ /**
* Create a FileStatus iterator against a path, with a given list object
* request.
*
@@ -250,7 +264,7 @@
if (!forceNonAuthoritativeMS &&
allowAuthoritative &&
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
- S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
+ S3AFileStatus[] statuses = S3AUtils.iteratorToStatuses(
metadataStoreListFilesIterator, tombstones);
cachedFilesIterator = createProvidedFileStatusIterator(
statuses, ACCEPT_ALL, acceptor);
@@ -329,6 +343,56 @@
tombstones);
}
+ /**
+ * Calculate list of file statuses assuming path
+ * to be a non-empty directory.
+ * @param path input path.
+ * @return Triple of file statuses, metaData, auth flag.
+ * @throws IOException Any IO problems.
+ */
+ public Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
+ getFileStatusesAssumingNonEmptyDir(Path path)
+ throws IOException {
+ String key = pathToKey(path);
+ List<S3AFileStatus> result;
+ if (!key.isEmpty()) {
+ key = key + '/';
+ }
+
+ boolean allowAuthoritative = listingOperationCallbacks
+ .allowAuthoritative(path);
+ DirListingMetadata dirMeta =
+ S3Guard.listChildrenWithTtl(
+ getStoreContext().getMetadataStore(),
+ path,
+ listingOperationCallbacks.getUpdatedTtlTimeProvider(),
+ allowAuthoritative);
+ // In auth mode return directly with auth flag.
+ if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
+ ProvidedFileStatusIterator mfsItr = createProvidedFileStatusIterator(
+ S3Guard.dirMetaToStatuses(dirMeta),
+ ACCEPT_ALL,
+ Listing.ACCEPT_ALL_BUT_S3N);
+ return Triple.of(mfsItr,
+ dirMeta, Boolean.TRUE);
+ }
+
+ S3ListRequest request = createListObjectsRequest(key, "/");
+ LOG.debug("listStatus: doing listObjects for directory {}", key);
+
+ FileStatusListingIterator filesItr = createFileStatusListingIterator(
+ path,
+ request,
+ ACCEPT_ALL,
+ new Listing.AcceptAllButSelfAndS3nDirs(path));
+
+ // return the results obtained from s3.
+ return Triple.of(
+ filesItr,
+ dirMeta,
+ Boolean.FALSE);
+ }
+
public S3ListRequest createListObjectsRequest(String key, String delimiter) {
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 63c80bd..86f2a88 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -185,6 +186,7 @@
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
+import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
@@ -2652,7 +2654,9 @@
*/
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
- return once("listStatus", f.toString(), () -> innerListStatus(f));
+ return once("listStatus",
+ f.toString(),
+ () -> iteratorToStatuses(innerListStatus(f), new HashSet<>()));
}
/**
@@ -2665,51 +2669,52 @@
* @throws IOException due to an IO problem.
* @throws AmazonClientException on failures inside the AWS SDK
*/
- private S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException,
- IOException, AmazonClientException {
+ private RemoteIterator<S3AFileStatus> innerListStatus(Path f)
+ throws FileNotFoundException,
+ IOException, AmazonClientException {
Path path = qualify(f);
- String key = pathToKey(path);
LOG.debug("List status for path: {}", path);
entryPoint(INVOCATION_LIST_STATUS);
- List<S3AFileStatus> result;
- final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
- StatusProbeEnum.ALL);
+ Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
+ statusesAssumingNonEmptyDir = listing
+ .getFileStatusesAssumingNonEmptyDir(path);
- if (fileStatus.isDirectory()) {
- if (!key.isEmpty()) {
- key = key + '/';
+ if (!statusesAssumingNonEmptyDir.getLeft().hasNext() &&
+ statusesAssumingNonEmptyDir.getRight()) {
+ // We are sure that this is an empty directory in auth mode.
+ return statusesAssumingNonEmptyDir.getLeft();
+ } else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) {
+ // We may have an empty dir, or may have file or may have nothing.
+ // So we call innerGetFileStatus to get the status, this may throw
+ // FileNotFoundException if we have nothing.
+ // So We are guaranteed to have either a dir marker or a file.
+ final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
+ StatusProbeEnum.ALL);
+ // If it is a file return directly.
+ if (fileStatus.isFile()) {
+ LOG.debug("Adding: rd (not a dir): {}", path);
+ S3AFileStatus[] stats = new S3AFileStatus[1];
+ stats[0] = fileStatus;
+ return listing.createProvidedFileStatusIterator(
+ stats,
+ ACCEPT_ALL,
+ Listing.ACCEPT_ALL_BUT_S3N);
}
-
- boolean allowAuthoritative = allowAuthoritative(f);
- DirListingMetadata dirMeta =
- S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
- allowAuthoritative);
- if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
- return S3Guard.dirMetaToStatuses(dirMeta);
- }
-
- S3ListRequest request = createListObjectsRequest(key, "/");
- LOG.debug("listStatus: doing listObjects for directory {}", key);
-
- Listing.FileStatusListingIterator files =
- listing.createFileStatusListingIterator(path,
- request,
- ACCEPT_ALL,
- new Listing.AcceptAllButSelfAndS3nDirs(path));
- result = new ArrayList<>(files.getBatchSize());
- while (files.hasNext()) {
- result.add(files.next());
- }
- // merge the results. This will update the store as needed
- return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
- allowAuthoritative, ttlTimeProvider);
- } else {
- LOG.debug("Adding: rd (not a dir): {}", path);
- S3AFileStatus[] stats = new S3AFileStatus[1];
- stats[0]= fileStatus;
- return stats;
}
+ // Here we have a directory which may or may not be empty.
+ // So we update the metastore and return.
+ return S3Guard.dirListingUnion(
+ metadataStore,
+ path,
+ statusesAssumingNonEmptyDir.getLeft(),
+ statusesAssumingNonEmptyDir.getMiddle(),
+ allowAuthoritative(path),
+ ttlTimeProvider, p ->
+ listing.createProvidedFileStatusIterator(
+ dirMetaToStatuses(statusesAssumingNonEmptyDir.getMiddle()),
+ ACCEPT_ALL,
+ Listing.ACCEPT_ALL_BUT_S3N));
}
/**
@@ -4497,7 +4502,7 @@
: null;
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
listing.createProvidedFileStatusIterator(
- S3Guard.dirMetaToStatuses(meta), filter, acceptor);
+ dirMetaToStatuses(meta), filter, acceptor);
return (allowAuthoritative && meta != null
&& meta.isAuthoritative())
? listing.createLocatedFileStatusIterator(
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 1d39950..3e9115c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -1417,6 +1418,30 @@
}
/**
+ * Convert the data of an iterator of {@link S3AFileStatus} to
+ * an array. Given tombstones are filtered out. If the iterator
+ * does return any item, an empty array is returned.
+ * @param iterator a non-null iterator
+ * @param tombstones
+ * @return a possibly-empty array of file status entries
+ * @throws IOException
+ */
+ public static S3AFileStatus[] iteratorToStatuses(
+ RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
+ throws IOException {
+ List<FileStatus> statuses = new ArrayList<>();
+
+ while (iterator.hasNext()) {
+ S3AFileStatus status = iterator.next();
+ if (!tombstones.contains(status.getPath())) {
+ statuses.add(status);
+ }
+ }
+
+ return statuses.toArray(new S3AFileStatus[0]);
+ }
+
+ /**
* An interface for use in lambda-expressions working with
* directory tree listings.
*/
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index ae5c293..78cedc2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -296,30 +296,6 @@
}
/**
- * Convert the data of an iterator of {@link S3AFileStatus} to
- * an array. Given tombstones are filtered out. If the iterator
- * does return any item, an empty array is returned.
- * @param iterator a non-null iterator
- * @param tombstones
- * @return a possibly-empty array of file status entries
- * @throws IOException
- */
- public static S3AFileStatus[] iteratorToStatuses(
- RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
- throws IOException {
- List<FileStatus> statuses = new ArrayList<>();
-
- while (iterator.hasNext()) {
- S3AFileStatus status = iterator.next();
- if (!tombstones.contains(status.getPath())) {
- statuses.add(status);
- }
- }
-
- return statuses.toArray(new S3AFileStatus[0]);
- }
-
- /**
* Convert the data of a directory listing to an array of {@link FileStatus}
* entries. Tombstones are filtered out at this point. If the listing is null
* an empty array is returned.
@@ -359,17 +335,22 @@
* @param dirMeta Directory listing from MetadataStore. May be null.
* @param isAuthoritative State of authoritative mode
* @param timeProvider Time provider to use when updating entries
+ * @param toStatusItr function to convert array of file status to
+ * RemoteIterator.
* @return Final result of directory listing.
* @throws IOException if metadata store update failed
*/
- public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
- List<S3AFileStatus> backingStatuses, DirListingMetadata dirMeta,
- boolean isAuthoritative, ITtlTimeProvider timeProvider)
- throws IOException {
+ public static RemoteIterator<S3AFileStatus> dirListingUnion(
+ MetadataStore ms, Path path,
+ RemoteIterator<S3AFileStatus> backingStatuses,
+ DirListingMetadata dirMeta, boolean isAuthoritative,
+ ITtlTimeProvider timeProvider,
+ Function<S3AFileStatus[], RemoteIterator<S3AFileStatus>> toStatusItr)
+ throws IOException {
// Fast-path for NullMetadataStore
if (isNullMetadataStore(ms)) {
- return backingStatuses.toArray(new S3AFileStatus[backingStatuses.size()]);
+ return backingStatuses;
}
assertQualified(path);
@@ -410,7 +391,7 @@
}
IOUtils.cleanupWithLogger(LOG, operationState);
- return dirMetaToStatuses(dirMeta);
+ return toStatusItr.apply(dirMetaToStatuses(dirMeta));
}
/**
@@ -429,7 +410,7 @@
private static void authoritativeUnion(
final MetadataStore ms,
final Path path,
- final List<S3AFileStatus> backingStatuses,
+ final RemoteIterator<S3AFileStatus> backingStatuses,
final DirListingMetadata dirMeta,
final ITtlTimeProvider timeProvider,
final BulkOperationState operationState) throws IOException {
@@ -440,7 +421,8 @@
Set<Path> deleted = dirMeta.listTombstones();
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
- for (S3AFileStatus s : backingStatuses) {
+ while (backingStatuses.hasNext()) {
+ S3AFileStatus s = backingStatuses.next();
final Path statusPath = s.getPath();
if (deleted.contains(statusPath)) {
continue;
@@ -493,16 +475,17 @@
private static void nonAuthoritativeUnion(
final MetadataStore ms,
final Path path,
- final List<S3AFileStatus> backingStatuses,
+ final RemoteIterator<S3AFileStatus> backingStatuses,
final DirListingMetadata dirMeta,
final ITtlTimeProvider timeProvider,
final BulkOperationState operationState) throws IOException {
- List<PathMetadata> entriesToAdd = new ArrayList<>(backingStatuses.size());
+ List<PathMetadata> entriesToAdd = new ArrayList<>();
Set<Path> deleted = dirMeta.listTombstones();
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
- for (S3AFileStatus s : backingStatuses) {
+ while (backingStatuses.hasNext()) {
+ S3AFileStatus s = backingStatuses.next();
final Path statusPath = s.getPath();
if (deleted.contains(statusPath)) {
continue;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
index 46e6f5f..941e701 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
@@ -177,6 +177,49 @@
}
@Test
+ public void testCostOfListStatusOnFile() throws Throwable {
+ describe("Performing listStatus() on a file");
+ Path file = path(getMethodName() + ".txt");
+ S3AFileSystem fs = getFileSystem();
+ touch(fs, file);
+ verifyMetrics(() ->
+ fs.listStatus(file),
+ whenRaw(LIST_STATUS_LIST_OP
+ .plus(GET_FILE_STATUS_ON_FILE)),
+ whenAuthoritative(LIST_STATUS_LIST_OP),
+ whenNonauth(LIST_STATUS_LIST_OP));
+ }
+
+ @Test
+ public void testCostOfListStatusOnEmptyDir() throws Throwable {
+ describe("Performing listStatus() on an empty dir");
+ Path dir = path(getMethodName());
+ S3AFileSystem fs = getFileSystem();
+ fs.mkdirs(dir);
+ verifyMetrics(() ->
+ fs.listStatus(dir),
+ whenRaw(LIST_STATUS_LIST_OP
+ .plus(GET_FILE_STATUS_ON_EMPTY_DIR)),
+ whenAuthoritative(NO_IO),
+ whenNonauth(LIST_STATUS_LIST_OP));
+ }
+
+ @Test
+ public void testCostOfListStatusOnNonEmptyDir() throws Throwable {
+ describe("Performing listStatus() on a non empty dir");
+ Path dir = path(getMethodName());
+ S3AFileSystem fs = getFileSystem();
+ fs.mkdirs(dir);
+ Path file = new Path(dir, "file.txt");
+ touch(fs, file);
+ verifyMetrics(() ->
+ fs.listStatus(dir),
+ whenRaw(LIST_STATUS_LIST_OP),
+ whenAuthoritative(NO_IO),
+ whenNonauth(LIST_STATUS_LIST_OP));
+ }
+
+ @Test
public void testCostOfGetFileStatusOnFile() throws Throwable {
describe("performing getFileStatus on a file");
Path simpleFile = file(methodPath());
@@ -406,8 +449,7 @@
fs.globStatus(basePath.suffix("/*"));
// 2 head + 1 list from getFileStatus on path,
// plus 1 list to match the glob pattern
- verifyRaw(GET_FILE_STATUS_ON_DIR
- .plus(LIST_OPERATION),
+ verifyRaw(LIST_STATUS_LIST_OP,
() -> fs.globStatus(basePath.suffix("/*")));
}
@@ -426,8 +468,7 @@
// unguarded: 2 head + 1 list from getFileStatus on path,
// plus 1 list to match the glob pattern
// no additional operations from symlink resolution
- verifyRaw(GET_FILE_STATUS_ON_DIR
- .plus(LIST_OPERATION),
+ verifyRaw(LIST_STATUS_LIST_OP,
() -> fs.globStatus(basePath.suffix("/*")));
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index f225800..f057449 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -35,18 +35,39 @@
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
+import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
+import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
+import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.ReflectionUtils;
+import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.MultiObjectDeleteException;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
+import javax.annotation.Nullable;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Assume;
@@ -61,12 +82,15 @@
import java.net.URISyntaxException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -895,6 +919,49 @@
}
/**
+ * Create mock implementation of store context.
+ * @param multiDelete
+ * @param store
+ * @param accessors
+ * @return
+ * @throws URISyntaxException
+ * @throws IOException
+ */
+ public static StoreContext createMockStoreContext(
+ boolean multiDelete,
+ OperationTrackingStore store,
+ ContextAccessors accessors)
+ throws URISyntaxException, IOException {
+ URI name = new URI("s3a://bucket");
+ Configuration conf = new Configuration();
+ return new StoreContextBuilder().setFsURI(name)
+ .setBucket("bucket")
+ .setConfiguration(conf)
+ .setUsername("alice")
+ .setOwner(UserGroupInformation.getCurrentUser())
+ .setExecutor(BlockingThreadPoolExecutorService.newInstance(
+ 4,
+ 4,
+ 10, TimeUnit.SECONDS,
+ "s3a-transfer-shared"))
+ .setExecutorCapacity(DEFAULT_EXECUTOR_CAPACITY)
+ .setInvoker(
+ new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
+ .setInstrumentation(new S3AInstrumentation(name))
+ .setStorageStatistics(new S3AStorageStatistics())
+ .setInputPolicy(S3AInputPolicy.Normal)
+ .setChangeDetectionPolicy(
+ ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
+ ChangeDetectionPolicy.Source.ETag, false))
+ .setMultiObjectDeleteEnabled(multiDelete)
+ .setMetadataStore(store)
+ .setUseListV1(false)
+ .setContextAccessors(accessors)
+ .setTimeProvider(new S3Guard.TtlTimeProvider(conf))
+ .build();
+ }
+
+ /**
* Helper class to do diffs of metrics.
*/
public static final class MetricDiff {
@@ -1472,4 +1539,293 @@
needEmptyDirectoryFlag,
probes);
}
+
+ public static class MinimalOperationCallbacks
+ implements OperationCallbacks {
+ @Override
+ public S3ObjectAttributes createObjectAttributes(
+ Path path,
+ String eTag,
+ String versionId,
+ long len) {
+ return null;
+ }
+
+ @Override
+ public S3ObjectAttributes createObjectAttributes(
+ S3AFileStatus fileStatus) {
+ return null;
+ }
+
+ @Override
+ public S3AReadOpContext createReadContext(
+ FileStatus fileStatus) {
+ return null;
+ }
+
+ @Override
+ public void finishRename(
+ Path sourceRenamed,
+ Path destCreated)
+ throws IOException {
+
+ }
+
+ @Override
+ public void deleteObjectAtPath(
+ Path path,
+ String key,
+ boolean isFile,
+ BulkOperationState operationState)
+ throws IOException {
+
+ }
+
+ @Override
+ public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
+ Path path,
+ S3AFileStatus status,
+ boolean collectTombstones,
+ boolean includeSelf)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public CopyResult copyFile(
+ String srcKey,
+ String destKey,
+ S3ObjectAttributes srcAttributes,
+ S3AReadOpContext readContext)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public DeleteObjectsResult removeKeys(
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+ boolean deleteFakeDir,
+ List<Path> undeletedObjectsOnFailure,
+ BulkOperationState operationState,
+ boolean quiet)
+ throws MultiObjectDeleteException, AmazonClientException,
+ IOException {
+ return null;
+ }
+
+ @Override
+ public boolean allowAuthoritative(Path p) {
+ return false;
+ }
+
+ @Override
+ public RemoteIterator<S3AFileStatus> listObjects(
+ Path path,
+ String key)
+ throws IOException {
+ return null;
+ }
+ }
+
+ /**
+ * MetadataStore which tracks what is deleted and added.
+ */
+ public static class OperationTrackingStore implements MetadataStore {
+
+ private final List<Path> deleted = new ArrayList<>();
+
+ private final List<Path> created = new ArrayList<>();
+
+ @Override
+ public void initialize(final FileSystem fs,
+ ITtlTimeProvider ttlTimeProvider) {
+ }
+
+ @Override
+ public void initialize(final Configuration conf,
+ ITtlTimeProvider ttlTimeProvider) {
+ }
+
+ @Override
+ public void forgetMetadata(final Path path) {
+ }
+
+ @Override
+ public PathMetadata get(final Path path) {
+ return null;
+ }
+
+ @Override
+ public PathMetadata get(final Path path,
+ final boolean wantEmptyDirectoryFlag) {
+ return null;
+ }
+
+ @Override
+ public DirListingMetadata listChildren(final Path path) {
+ return null;
+ }
+
+ @Override
+ public void put(final PathMetadata meta) {
+ put(meta, null);
+ }
+
+ @Override
+ public void put(final PathMetadata meta,
+ final BulkOperationState operationState) {
+ created.add(meta.getFileStatus().getPath());
+ }
+
+ @Override
+ public void put(final Collection<? extends PathMetadata> metas,
+ final BulkOperationState operationState) {
+ metas.stream().forEach(meta -> put(meta, null));
+ }
+
+ @Override
+ public void put(final DirListingMetadata meta,
+ final List<Path> unchangedEntries,
+ final BulkOperationState operationState) {
+ created.add(meta.getPath());
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ @Override
+ public void delete(final Path path,
+ final BulkOperationState operationState) {
+ deleted.add(path);
+ }
+
+ @Override
+ public void deletePaths(final Collection<Path> paths,
+ @Nullable final BulkOperationState operationState)
+ throws IOException {
+ deleted.addAll(paths);
+ }
+
+ @Override
+ public void deleteSubtree(final Path path,
+ final BulkOperationState operationState) {
+
+ }
+
+ @Override
+ public void move(@Nullable final Collection<Path> pathsToDelete,
+ @Nullable final Collection<PathMetadata> pathsToCreate,
+ @Nullable final BulkOperationState operationState) {
+ }
+
+ @Override
+ public void prune(final PruneMode pruneMode, final long cutoff) {
+ }
+
+ @Override
+ public long prune(final PruneMode pruneMode,
+ final long cutoff,
+ final String keyPrefix) {
+ return 0;
+ }
+
+ @Override
+ public BulkOperationState initiateBulkWrite(
+ final BulkOperationState.OperationType operation,
+ final Path dest) {
+ return null;
+ }
+
+ @Override
+ public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
+ }
+
+ @Override
+ public Map<String, String> getDiagnostics() {
+ return null;
+ }
+
+ @Override
+ public void updateParameters(final Map<String, String> parameters) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public List<Path> getDeleted() {
+ return deleted;
+ }
+
+ public List<Path> getCreated() {
+ return created;
+ }
+
+ @Override
+ public RenameTracker initiateRenameOperation(
+ final StoreContext storeContext,
+ final Path source,
+ final S3AFileStatus sourceStatus,
+ final Path dest) {
+ throw new UnsupportedOperationException("unsupported");
+ }
+
+ @Override
+ public void addAncestors(final Path qualifiedPath,
+ @Nullable final BulkOperationState operationState) {
+
+ }
+ }
+
+ public static class MinimalListingOperationCallbacks
+ implements ListingOperationCallbacks {
+ @Override
+ public CompletableFuture<S3ListResult> listObjectsAsync(
+ S3ListRequest request)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<S3ListResult> continueListObjectsAsync(
+ S3ListRequest request,
+ S3ListResult prevResult)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public S3ALocatedFileStatus toLocatedFileStatus(
+ S3AFileStatus status) throws IOException {
+ return null;
+ }
+
+ @Override
+ public S3ListRequest createListObjectsRequest(
+ String key,
+ String delimiter) {
+ return null;
+ }
+
+ @Override
+ public long getDefaultBlockSize(Path path) {
+ return 0;
+ }
+
+ @Override
+ public int getMaxKeys() {
+ return 0;
+ }
+
+ @Override
+ public ITtlTimeProvider getUpdatedTtlTimeProvider() {
+ return null;
+ }
+
+ @Override
+ public boolean allowAuthoritative(Path p) {
+ return false;
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
index 0729f2a..cdf7927 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
@@ -18,26 +18,15 @@
package org.apache.hadoop.fs.s3a.impl;
-import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
-import com.amazonaws.services.s3.transfer.model.CopyResult;
import com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.Before;
@@ -45,32 +34,8 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.fs.s3a.Invoker;
-import org.apache.hadoop.fs.s3a.S3AFileStatus;
-import org.apache.hadoop.fs.s3a.S3AInputPolicy;
-import org.apache.hadoop.fs.s3a.S3AInstrumentation;
-import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
-import org.apache.hadoop.fs.s3a.S3AReadOpContext;
-import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
-import org.apache.hadoop.fs.s3a.S3ListRequest;
-import org.apache.hadoop.fs.s3a.S3ListResult;
-import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
-import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
-import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
-import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
-import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
-import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
-import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
-import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.ACCESS_DENIED;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
@@ -93,8 +58,8 @@
@Before
public void setUp() throws Exception {
- context = createMockStoreContext(true,
- new OperationTrackingStore());
+ context = S3ATestUtils.createMockStoreContext(true,
+ new S3ATestUtils.OperationTrackingStore(), CONTEXT_ACCESSORS);
}
@Test
@@ -187,9 +152,10 @@
final List<Path> deleteAllowed = Lists.newArrayList(pathA, pathAC);
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
deleteForbidden);
- OperationTrackingStore store
- = new OperationTrackingStore();
- StoreContext storeContext = createMockStoreContext(true, store);
+ S3ATestUtils.OperationTrackingStore store
+ = new S3ATestUtils.OperationTrackingStore();
+ StoreContext storeContext = S3ATestUtils
+ .createMockStoreContext(true, store, CONTEXT_ACCESSORS);
MultiObjectDeleteSupport deleteSupport
= new MultiObjectDeleteSupport(storeContext, null);
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
@@ -210,174 +176,6 @@
}
- private StoreContext createMockStoreContext(boolean multiDelete,
- OperationTrackingStore store) throws URISyntaxException, IOException {
- URI name = new URI("s3a://bucket");
- Configuration conf = new Configuration();
- return new StoreContextBuilder().setFsURI(name)
- .setBucket("bucket")
- .setConfiguration(conf)
- .setUsername("alice")
- .setOwner(UserGroupInformation.getCurrentUser())
- .setExecutor(BlockingThreadPoolExecutorService.newInstance(
- 4,
- 4,
- 10, TimeUnit.SECONDS,
- "s3a-transfer-shared"))
- .setExecutorCapacity(Constants.DEFAULT_EXECUTOR_CAPACITY)
- .setInvoker(
- new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
- .setInstrumentation(new S3AInstrumentation(name))
- .setStorageStatistics(new S3AStorageStatistics())
- .setInputPolicy(S3AInputPolicy.Normal)
- .setChangeDetectionPolicy(
- ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
- ChangeDetectionPolicy.Source.ETag, false))
- .setMultiObjectDeleteEnabled(multiDelete)
- .setMetadataStore(store)
- .setUseListV1(false)
- .setContextAccessors(CONTEXT_ACCESSORS)
- .setTimeProvider(new S3Guard.TtlTimeProvider(conf))
- .build();
- }
-
- private static class MinimalListingOperationCallbacks
- implements ListingOperationCallbacks {
- @Override
- public CompletableFuture<S3ListResult> listObjectsAsync(
- S3ListRequest request)
- throws IOException {
- return null;
- }
-
- @Override
- public CompletableFuture<S3ListResult> continueListObjectsAsync(
- S3ListRequest request,
- S3ListResult prevResult)
- throws IOException {
- return null;
- }
-
- @Override
- public S3ALocatedFileStatus toLocatedFileStatus(
- S3AFileStatus status) throws IOException {
- return null;
- }
-
- @Override
- public S3ListRequest createListObjectsRequest(
- String key,
- String delimiter) {
- return null;
- }
-
- @Override
- public long getDefaultBlockSize(Path path) {
- return 0;
- }
-
- @Override
- public int getMaxKeys() {
- return 0;
- }
-
- @Override
- public ITtlTimeProvider getUpdatedTtlTimeProvider() {
- return null;
- }
-
- @Override
- public boolean allowAuthoritative(Path p) {
- return false;
- }
- }
- private static class MinimalOperationCallbacks
- implements OperationCallbacks {
- @Override
- public S3ObjectAttributes createObjectAttributes(
- Path path,
- String eTag,
- String versionId,
- long len) {
- return null;
- }
-
- @Override
- public S3ObjectAttributes createObjectAttributes(
- S3AFileStatus fileStatus) {
- return null;
- }
-
- @Override
- public S3AReadOpContext createReadContext(
- FileStatus fileStatus) {
- return null;
- }
-
- @Override
- public void finishRename(
- Path sourceRenamed,
- Path destCreated)
- throws IOException {
-
- }
-
- @Override
- public void deleteObjectAtPath(
- Path path,
- String key,
- boolean isFile,
- BulkOperationState operationState)
- throws IOException {
-
- }
-
- @Override
- public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
- Path path,
- S3AFileStatus status,
- boolean collectTombstones,
- boolean includeSelf)
- throws IOException {
- return null;
- }
-
- @Override
- public CopyResult copyFile(
- String srcKey,
- String destKey,
- S3ObjectAttributes srcAttributes,
- S3AReadOpContext readContext)
- throws IOException {
- return null;
- }
-
- @Override
- public DeleteObjectsResult removeKeys(
- List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- boolean deleteFakeDir,
- List<Path> undeletedObjectsOnFailure,
- BulkOperationState operationState,
- boolean quiet)
- throws MultiObjectDeleteException, AmazonClientException,
- IOException {
- return null;
- }
-
- @Override
- public boolean allowAuthoritative(Path p) {
- return false;
- }
-
- @Override
- public RemoteIterator<S3AFileStatus> listObjects(
- Path path,
- String key)
- throws IOException {
- return null;
- }
- }
-
private static class MinimalContextAccessor implements ContextAccessors {
@Override
@@ -406,155 +204,5 @@
return path;
}
}
- /**
- * MetadataStore which tracks what is deleted and added.
- */
- private static class OperationTrackingStore implements MetadataStore {
-
- private final List<Path> deleted = new ArrayList<>();
-
- private final List<Path> created = new ArrayList<>();
-
- @Override
- public void initialize(final FileSystem fs,
- ITtlTimeProvider ttlTimeProvider) {
- }
-
- @Override
- public void initialize(final Configuration conf,
- ITtlTimeProvider ttlTimeProvider) {
- }
-
- @Override
- public void forgetMetadata(final Path path) {
- }
-
- @Override
- public PathMetadata get(final Path path) {
- return null;
- }
-
- @Override
- public PathMetadata get(final Path path,
- final boolean wantEmptyDirectoryFlag) {
- return null;
- }
-
- @Override
- public DirListingMetadata listChildren(final Path path) {
- return null;
- }
-
- @Override
- public void put(final PathMetadata meta) {
- put(meta, null);
- }
-
- @Override
- public void put(final PathMetadata meta,
- final BulkOperationState operationState) {
- created.add(meta.getFileStatus().getPath());
- }
-
- @Override
- public void put(final Collection<? extends PathMetadata> metas,
- final BulkOperationState operationState) {
- metas.stream().forEach(meta -> put(meta, null));
- }
-
- @Override
- public void put(final DirListingMetadata meta,
- final List<Path> unchangedEntries,
- final BulkOperationState operationState) {
- created.add(meta.getPath());
- }
-
- @Override
- public void destroy() {
- }
-
- @Override
- public void delete(final Path path,
- final BulkOperationState operationState) {
- deleted.add(path);
- }
-
- @Override
- public void deletePaths(final Collection<Path> paths,
- @Nullable final BulkOperationState operationState)
- throws IOException {
- deleted.addAll(paths);
- }
-
- @Override
- public void deleteSubtree(final Path path,
- final BulkOperationState operationState) {
-
- }
-
- @Override
- public void move(@Nullable final Collection<Path> pathsToDelete,
- @Nullable final Collection<PathMetadata> pathsToCreate,
- @Nullable final BulkOperationState operationState) {
- }
-
- @Override
- public void prune(final PruneMode pruneMode, final long cutoff) {
- }
-
- @Override
- public long prune(final PruneMode pruneMode,
- final long cutoff,
- final String keyPrefix) {
- return 0;
- }
-
- @Override
- public BulkOperationState initiateBulkWrite(
- final BulkOperationState.OperationType operation,
- final Path dest) {
- return null;
- }
-
- @Override
- public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
- }
-
- @Override
- public Map<String, String> getDiagnostics() {
- return null;
- }
-
- @Override
- public void updateParameters(final Map<String, String> parameters) {
- }
-
- @Override
- public void close() {
- }
-
- public List<Path> getDeleted() {
- return deleted;
- }
-
- public List<Path> getCreated() {
- return created;
- }
-
- @Override
- public RenameTracker initiateRenameOperation(
- final StoreContext storeContext,
- final Path source,
- final S3AFileStatus sourceStatus,
- final Path dest) {
- throw new UnsupportedOperationException("unsupported");
- }
-
- @Override
- public void addAncestors(final Path qualifiedPath,
- @Nullable final BulkOperationState operationState) {
-
- }
- }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
index 46a6b71..54b6866 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
@@ -107,9 +107,10 @@
new OperationCost(0, 1);
/** listFiles always does a LIST. */
- public static final OperationCost LIST_FILES_LIST_OP =
- new OperationCost(0, 1);
+ public static final OperationCost LIST_FILES_LIST_OP = LIST_OPERATION;
+ /** listStatus always does a LIST. */
+ public static final OperationCost LIST_STATUS_LIST_OP = LIST_OPERATION;
/**
* Metadata cost of a copy operation, as used during rename.
* This happens even if the store is guarded.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
index 672f3a9..eaa363b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
@@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
@@ -39,7 +40,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.service.launcher.LauncherExitCodes;
import org.apache.hadoop.test.LambdaTestUtils;
@@ -47,6 +50,8 @@
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
+import static org.apache.hadoop.fs.s3a.Listing.toProvidedFileStatusIterator;
+import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -79,7 +84,6 @@
ms.initialize(conf, new S3Guard.TtlTimeProvider(conf));
timeProvider = new S3Guard.TtlTimeProvider(
DEFAULT_METADATASTORE_METADATA_TTL);
-
}
@After
@@ -108,9 +112,14 @@
List<S3AFileStatus> s3Listing = Arrays.asList(
s1Status,
s2Status);
-
- FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing,
- dirMeta, false, timeProvider);
+ RemoteIterator<S3AFileStatus> storeItr = toProvidedFileStatusIterator(
+ s3Listing.toArray(new S3AFileStatus[0]));
+ RemoteIterator<S3AFileStatus> resultItr = S3Guard.dirListingUnion(
+ ms, DIR_PATH, storeItr, dirMeta, false,
+ timeProvider, s3AFileStatuses ->
+ toProvidedFileStatusIterator(dirMetaToStatuses(dirMeta)));
+ S3AFileStatus[] result = S3AUtils.iteratorToStatuses(
+ resultItr, new HashSet<>());
assertEquals("listing length", 4, result.length);
assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
@@ -124,9 +133,18 @@
S3AFileStatus f1Status2 = new S3AFileStatus(
200, System.currentTimeMillis(), new Path(MS_FILE_1),
1, null, "tag2", "ver2");
- FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH,
- Arrays.asList(f1Status2),
- dirMeta, false, timeProvider);
+ S3AFileStatus[] f1Statuses = new S3AFileStatus[1];
+ f1Statuses[0] = f1Status2;
+ RemoteIterator<S3AFileStatus> itr = toProvidedFileStatusIterator(
+ f1Statuses);
+ FileStatus[] result2 = S3AUtils.iteratorToStatuses(
+ S3Guard.dirListingUnion(
+ ms, DIR_PATH, itr, dirMeta,
+ false, timeProvider,
+ s3AFileStatuses ->
+ toProvidedFileStatusIterator(
+ dirMetaToStatuses(dirMeta))),
+ new HashSet<>());
// the listing returns the new status
Assertions.assertThat(find(result2, MS_FILE_1))
.describedAs("Entry in listing results for %s", MS_FILE_1)
@@ -159,9 +177,18 @@
ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
DEFAULT_METADATASTORE_METADATA_TTL);
- FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing,
- dirMeta, true, timeProvider);
+ RemoteIterator<S3AFileStatus> storeItr = toProvidedFileStatusIterator(
+ s3Listing.toArray(new S3AFileStatus[0]));
+ RemoteIterator<S3AFileStatus> resultItr = S3Guard
+ .dirListingUnion(ms, DIR_PATH, storeItr, dirMeta,
+ true, timeProvider,
+ s3AFileStatuses ->
+ toProvidedFileStatusIterator(
+ dirMetaToStatuses(dirMeta)));
+
+ S3AFileStatus[] result = S3AUtils.iteratorToStatuses(
+ resultItr, new HashSet<>());
assertEquals("listing length", 4, result.length);
assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
@@ -181,13 +208,21 @@
S3AFileStatus s1Status2 = new S3AFileStatus(
200, System.currentTimeMillis(), new Path(S3_FILE_3),
1, null, "tag2", "ver2");
+ S3AFileStatus[] f1Statuses = new S3AFileStatus[1];
+ f1Statuses[0] = s1Status2;
+ RemoteIterator<S3AFileStatus> itr =
+ toProvidedFileStatusIterator(f1Statuses);
+ FileStatus[] result2 = S3AUtils.iteratorToStatuses(
+ S3Guard.dirListingUnion(ms, DIR_PATH, itr, dirMeta,
+ true, timeProvider,
+ s3AFileStatuses ->
+ toProvidedFileStatusIterator(
+ dirMetaToStatuses(dirMeta))),
+ new HashSet<>());
// but the result of the listing contains the old entry
// because auth mode doesn't pick up changes in S3 which
// didn't go through s3guard
- FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH,
- Arrays.asList(s1Status2),
- dirMeta2, true, timeProvider);
Assertions.assertThat(find(result2, S3_FILE_3))
.describedAs("Entry in listing results for %s", S3_FILE_3)
.isSameAs(file3Meta.getFileStatus());