blob: 20ed288bdfec532a455eb2d405c02ed1ca27ad1b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import javax.annotation.Nullable;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
import static org.apache.hadoop.fs.s3a.S3AUtils.maybeAddTrailingSlash;
import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory;
import static org.apache.hadoop.fs.s3a.S3AUtils.stringify;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.pathToKey;
/**
* Place for the S3A listing classes; keeps all the small classes under control.
*/
@InterfaceAudience.Private
public class Listing extends AbstractStoreOperation {
private static final Logger LOG = S3AFileSystem.LOG;
static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();
private final ListingOperationCallbacks listingOperationCallbacks;
public Listing(ListingOperationCallbacks listingOperationCallbacks,
StoreContext storeContext) {
super(storeContext);
this.listingOperationCallbacks = listingOperationCallbacks;
}
/**
* Create a FileStatus iterator against a provided list of file status, with
* a given status filter.
*
* @param fileStatuses the provided list of file status. NO remote calls.
* @param filter file path filter on which paths to accept
* @param acceptor the file status acceptor
* @return the file status iterator
*/
ProvidedFileStatusIterator createProvidedFileStatusIterator(
S3AFileStatus[] fileStatuses,
PathFilter filter,
FileStatusAcceptor acceptor) {
return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor);
}
/**
* Create a FileStatus iterator against a provided list of file status.
* @param fileStatuses array of file status.
* @return the file status iterator.
*/
@VisibleForTesting
public static ProvidedFileStatusIterator toProvidedFileStatusIterator(
S3AFileStatus[] fileStatuses) {
return new ProvidedFileStatusIterator(fileStatuses,
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N);
}
/**
* Create a FileStatus iterator against a path, with a given list object
* request.
*
* @param listPath path of the listing
* @param request initial request to make
* @param filter the filter on which paths to accept
* @param acceptor the class/predicate to decide which entries to accept
* in the listing based on the full file status.
* @return the iterator
* @throws IOException IO Problems
*/
public FileStatusListingIterator createFileStatusListingIterator(
Path listPath,
S3ListRequest request,
PathFilter filter,
Listing.FileStatusAcceptor acceptor) throws IOException {
return createFileStatusListingIterator(listPath, request, filter, acceptor,
null);
}
/**
* Create a FileStatus iterator against a path, with a given
* list object request.
* @param listPath path of the listing
* @param request initial request to make
* @param filter the filter on which paths to accept
* @param acceptor the class/predicate to decide which entries to accept
* in the listing based on the full file status.
* @param providedStatus the provided list of file status, which may contain
* items that are not listed from source.
* @return the iterator
* @throws IOException IO Problems
*/
@Retries.RetryRaw
public FileStatusListingIterator createFileStatusListingIterator(
Path listPath,
S3ListRequest request,
PathFilter filter,
Listing.FileStatusAcceptor acceptor,
RemoteIterator<S3AFileStatus> providedStatus) throws IOException {
return new FileStatusListingIterator(
createObjectListingIterator(listPath, request),
filter,
acceptor,
providedStatus);
}
/**
* Create an object listing iterator against a path, with a given
* list object request.
* @param listPath path of the listing
* @param request initial request to make
* @return the iterator
* @throws IOException IO Problems
*/
@Retries.RetryRaw
public ObjectListingIterator createObjectListingIterator(
final Path listPath,
final S3ListRequest request) throws IOException {
return new ObjectListingIterator(listPath, request);
}
/**
* Create a located status iterator over a file status iterator.
* @param statusIterator an iterator over the remote status entries
* @return a new remote iterator
*/
@VisibleForTesting
public LocatedFileStatusIterator createLocatedFileStatusIterator(
RemoteIterator<S3AFileStatus> statusIterator) {
return new LocatedFileStatusIterator(statusIterator);
}
/**
* Create an located status iterator that wraps another to filter out a set
* of recently deleted items.
* @param iterator an iterator over the remote located status entries.
* @param tombstones set of paths that are recently deleted and should be
* filtered.
* @return a new remote iterator.
*/
@VisibleForTesting
TombstoneReconcilingIterator createTombstoneReconcilingIterator(
RemoteIterator<S3ALocatedFileStatus> iterator, Set<Path> tombstones) {
return new TombstoneReconcilingIterator(iterator, tombstones);
}
/**
* List files under a path assuming the path to be a directory.
* @param path input path.
* @param recursive recursive listing?
* @param acceptor file status filter
* @param collectTombstones should tombstones be collected from S3Guard?
* @param forceNonAuthoritativeMS forces metadata store to act like non
* authoritative. This is useful when
* listFiles output is used by import tool.
* @return an iterator over listing.
* @throws IOException any exception.
*/
public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
Path path,
boolean recursive, Listing.FileStatusAcceptor acceptor,
boolean collectTombstones,
boolean forceNonAuthoritativeMS) throws IOException {
String key = maybeAddTrailingSlash(pathToKey(path));
String delimiter = recursive ? null : "/";
if (recursive) {
LOG.debug("Recursive list of all entries under {}", key);
} else {
LOG.debug("Requesting all entries under {} with delimiter '{}'",
key, delimiter);
}
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
final Set<Path> tombstones;
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(path);
if (recursive) {
final PathMetadata pm = getStoreContext()
.getMetadataStore()
.get(path, true);
if (pm != null) {
if (pm.isDeleted()) {
OffsetDateTime deletedAt = OffsetDateTime
.ofInstant(Instant.ofEpochMilli(
pm.getFileStatus().getModificationTime()),
ZoneOffset.UTC);
throw new FileNotFoundException("Path " + path + " is recorded as " +
"deleted by S3Guard at " + deletedAt);
}
}
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
new MetadataStoreListFilesIterator(
getStoreContext().getMetadataStore(),
pm,
allowAuthoritative);
tombstones = metadataStoreListFilesIterator.listTombstones();
// if all of the below is true
// - authoritative access is allowed for this metadatastore
// for this directory,
// - all the directory listings are authoritative on the client
// - the caller does not force non-authoritative access
// return the listing without any further s3 access
if (!forceNonAuthoritativeMS &&
allowAuthoritative &&
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
S3AFileStatus[] statuses = S3AUtils.iteratorToStatuses(
metadataStoreListFilesIterator, tombstones);
cachedFilesIterator = createProvidedFileStatusIterator(
statuses, ACCEPT_ALL, acceptor);
return createLocatedFileStatusIterator(cachedFilesIterator);
}
cachedFilesIterator = metadataStoreListFilesIterator;
} else {
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(
getStoreContext().getMetadataStore(),
path,
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
allowAuthoritative);
if (meta != null) {
tombstones = meta.listTombstones();
} else {
tombstones = null;
}
cachedFilesIterator = createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
// metadata listing is authoritative, so return it directly
return createLocatedFileStatusIterator(cachedFilesIterator);
}
}
return createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(path,
listingOperationCallbacks
.createListObjectsRequest(key, delimiter),
ACCEPT_ALL,
acceptor,
cachedFilesIterator)),
collectTombstones ? tombstones : null);
}
/**
* Generate list located status for a directory.
* Also performing tombstone reconciliation for guarded directories.
* @param dir directory to check.
* @param filter a path filter.
* @return an iterator that traverses statuses of the given dir.
* @throws IOException in case of failure.
*/
public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
Path dir, PathFilter filter) throws IOException {
final String key = maybeAddTrailingSlash(pathToKey(dir));
final Listing.FileStatusAcceptor acceptor =
new Listing.AcceptAllButSelfAndS3nDirs(dir);
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(dir);
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(getStoreContext().getMetadataStore(),
dir,
listingOperationCallbacks
.getUpdatedTtlTimeProvider(),
allowAuthoritative);
Set<Path> tombstones = meta != null
? meta.listTombstones()
: null;
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
return (allowAuthoritative && meta != null
&& meta.isAuthoritative())
? createLocatedFileStatusIterator(
cachedFileStatusIterator)
: createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(dir,
listingOperationCallbacks
.createListObjectsRequest(key, "/"),
filter,
acceptor,
cachedFileStatusIterator)),
tombstones);
}
/**
* Calculate list of file statuses assuming path
* to be a non-empty directory.
* @param path input path.
* @return Triple of file statuses, metaData, auth flag.
* @throws IOException Any IO problems.
*/
public Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
getFileStatusesAssumingNonEmptyDir(Path path)
throws IOException {
String key = pathToKey(path);
List<S3AFileStatus> result;
if (!key.isEmpty()) {
key = key + '/';
}
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(path);
DirListingMetadata dirMeta =
S3Guard.listChildrenWithTtl(
getStoreContext().getMetadataStore(),
path,
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
allowAuthoritative);
// In auth mode return directly with auth flag.
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
ProvidedFileStatusIterator mfsItr = createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(dirMeta),
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N);
return Triple.of(mfsItr,
dirMeta, Boolean.TRUE);
}
S3ListRequest request = createListObjectsRequest(key, "/");
LOG.debug("listStatus: doing listObjects for directory {}", key);
FileStatusListingIterator filesItr = createFileStatusListingIterator(
path,
request,
ACCEPT_ALL,
new Listing.AcceptAllButSelfAndS3nDirs(path));
// return the results obtained from s3.
return Triple.of(
filesItr,
dirMeta,
Boolean.FALSE);
}
public S3ListRequest createListObjectsRequest(String key, String delimiter) {
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
}
/**
* Interface to implement by the logic deciding whether to accept a summary
* entry or path as a valid file or directory.
*/
interface FileStatusAcceptor {
/**
* Predicate to decide whether or not to accept a summary entry.
* @param keyPath qualified path to the entry
* @param summary summary entry
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.
*/
boolean accept(Path keyPath, S3ObjectSummary summary);
/**
* Predicate to decide whether or not to accept a prefix.
* @param keyPath qualified path to the entry
* @param commonPrefix the prefix
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.)
*/
boolean accept(Path keyPath, String commonPrefix);
/**
* Predicate to decide whether or not to accept a file status.
* @param status file status containing file path information
* @return true if the status is accepted else false
*/
boolean accept(FileStatus status);
}
/**
* A remote iterator which only iterates over a single `LocatedFileStatus`
* value.
*
* If the status value is null, the iterator declares that it has no data.
* This iterator is used to handle {@link S3AFileSystem#listStatus(Path)}
* calls where the path handed in refers to a file, not a directory:
* this is the iterator returned.
*/
static final class SingleStatusRemoteIterator
implements RemoteIterator<S3ALocatedFileStatus> {
/**
* The status to return; set to null after the first iteration.
*/
private S3ALocatedFileStatus status;
/**
* Constructor.
* @param status status value: may be null, in which case
* the iterator is empty.
*/
SingleStatusRemoteIterator(S3ALocatedFileStatus status) {
this.status = status;
}
/**
* {@inheritDoc}
* @return true if there is a file status to return: this is always false
* for the second iteration, and may be false for the first.
* @throws IOException never
*/
@Override
public boolean hasNext() throws IOException {
return status != null;
}
/**
* {@inheritDoc}
* @return the non-null status element passed in when the instance was
* constructed, if it ha not already been retrieved.
* @throws IOException never
* @throws NoSuchElementException if this is the second call, or it is
* the first call and a null {@link LocatedFileStatus} entry was passed
* to the constructor.
*/
@Override
public S3ALocatedFileStatus next() throws IOException {
if (hasNext()) {
S3ALocatedFileStatus s = this.status;
status = null;
return s;
} else {
throw new NoSuchElementException();
}
}
}
/**
* This wraps up a provided non-null list of file status as a remote iterator.
*
* It firstly filters the provided list and later {@link #next} call will get
* from the filtered list. This suffers from scalability issues if the
* provided list is too large.
*
* There is no remote data to fetch.
*/
static class ProvidedFileStatusIterator
implements RemoteIterator<S3AFileStatus> {
private final ArrayList<S3AFileStatus> filteredStatusList;
private int index = 0;
ProvidedFileStatusIterator(S3AFileStatus[] fileStatuses, PathFilter filter,
FileStatusAcceptor acceptor) {
Preconditions.checkArgument(fileStatuses != null, "Null status list!");
filteredStatusList = new ArrayList<>(fileStatuses.length);
for (S3AFileStatus status : fileStatuses) {
if (filter.accept(status.getPath()) && acceptor.accept(status)) {
filteredStatusList.add(status);
}
}
filteredStatusList.trimToSize();
}
@Override
public boolean hasNext() throws IOException {
return index < filteredStatusList.size();
}
@Override
public S3AFileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
return filteredStatusList.get(index++);
}
}
/**
* Wraps up object listing into a remote iterator which will ask for more
* listing data if needed.
*
* This is a complex operation, especially the process to determine
* if there are more entries remaining. If there are no more results
* remaining in the (filtered) results of the current listing request, then
* another request is made <i>and those results filtered</i> before the
* iterator can declare that there is more data available.
*
* The need to filter the results precludes the iterator from simply
* declaring that if the {@link ObjectListingIterator#hasNext()}
* is true then there are more results. Instead the next batch of results must
* be retrieved and filtered.
*
* What does this mean? It means that remote requests to retrieve new
* batches of object listings are made in the {@link #hasNext()} call;
* the {@link #next()} call simply returns the filtered results of the last
* listing processed. However, do note that {@link #next()} calls
* {@link #hasNext()} during its operation. This is critical to ensure
* that a listing obtained through a sequence of {@link #next()} will
* complete with the same set of results as a classic
* {@code while(it.hasNext()} loop.
*
* Thread safety: None.
*/
class FileStatusListingIterator
implements RemoteIterator<S3AFileStatus> {
/** Source of objects. */
private final ObjectListingIterator source;
/** Filter of paths from API call. */
private final PathFilter filter;
/** Filter of entries from file status. */
private final FileStatusAcceptor acceptor;
/** request batch size. */
private int batchSize;
/** Iterator over the current set of results. */
private ListIterator<S3AFileStatus> statusBatchIterator;
private final Map<Path, S3AFileStatus> providedStatus;
private Iterator<S3AFileStatus> providedStatusIterator;
/**
* Create an iterator over file status entries.
* @param source the listing iterator from a listObjects call.
* @param filter the filter on which paths to accept
* @param acceptor the class/predicate to decide which entries to accept
* in the listing based on the full file status.
* @param providedStatus the provided list of file status, which may contain
* items that are not listed from source.
* @throws IOException IO Problems
*/
@Retries.RetryTranslated
FileStatusListingIterator(ObjectListingIterator source,
PathFilter filter,
FileStatusAcceptor acceptor,
@Nullable RemoteIterator<S3AFileStatus> providedStatus)
throws IOException {
this.source = source;
this.filter = filter;
this.acceptor = acceptor;
this.providedStatus = new HashMap<>();
for (; providedStatus != null && providedStatus.hasNext();) {
final S3AFileStatus status = providedStatus.next();
Path path = status.getPath();
if (filter.accept(path) && acceptor.accept(status)) {
this.providedStatus.put(path, status);
}
}
// build the first set of results. This will not trigger any
// remote IO, assuming the source iterator is in its initial
// iteration
requestNextBatch();
}
/**
* Report whether or not there is new data available.
* If there is data in the local filtered list, return true.
* Else: request more data util that condition is met, or there
* is no more remote listing data.
* Lastly, return true if the {@code providedStatusIterator}
* has left items.
* @return true if a call to {@link #next()} will succeed.
* @throws IOException
*/
@Override
@Retries.RetryTranslated
public boolean hasNext() throws IOException {
return sourceHasNext() || providedStatusIterator.hasNext();
}
@Retries.RetryTranslated
private boolean sourceHasNext() throws IOException {
if (statusBatchIterator.hasNext() || requestNextBatch()) {
return true;
} else {
// turn to file status that are only in provided list
if (providedStatusIterator == null) {
LOG.debug("Start iterating the provided status.");
providedStatusIterator = providedStatus.values().iterator();
}
return false;
}
}
@Override
@Retries.RetryTranslated
public S3AFileStatus next() throws IOException {
final S3AFileStatus status;
if (sourceHasNext()) {
status = statusBatchIterator.next();
// We remove from provided map the file status listed by S3 so that
// this does not return duplicate items.
// The provided status is returned as it is assumed to have the better
// metadata (i.e. the eTag and versionId from S3Guard)
S3AFileStatus provided = providedStatus.remove(status.getPath());
if (provided != null) {
LOG.debug(
"Removed and returned the status from provided file status {}",
status);
return provided;
}
} else {
if (providedStatusIterator.hasNext()) {
status = providedStatusIterator.next();
LOG.debug("Returning provided file status {}", status);
} else {
throw new NoSuchElementException();
}
}
return status;
}
/**
* Try to retrieve another batch.
* Note that for the initial batch,
* {@link ObjectListingIterator} does not generate a request;
* it simply returns the initial set.
*
* @return true if a new batch was created.
* @throws IOException IO problems
*/
@Retries.RetryTranslated
private boolean requestNextBatch() throws IOException {
// look for more object listing batches being available
while (source.hasNext()) {
// if available, retrieve it and build the next status
if (buildNextStatusBatch(source.next())) {
// this batch successfully generated entries matching the filters/
// acceptors; declare that the request was successful
return true;
} else {
LOG.debug("All entries in batch were filtered...continuing");
}
}
// if this code is reached, it means that all remaining
// object lists have been retrieved, and there are no new entries
// to return.
return false;
}
/**
* Build the next status batch from a listing.
* @param objects the next object listing
* @return true if this added any entries after filtering
*/
private boolean buildNextStatusBatch(S3ListResult objects) {
// counters for debug logs
int added = 0, ignored = 0;
// list to fill in with results. Initial size will be list maximum.
List<S3AFileStatus> stats = new ArrayList<>(
objects.getObjectSummaries().size() +
objects.getCommonPrefixes().size());
// objects
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
String key = summary.getKey();
Path keyPath = getStoreContext().getContextAccessors().keyToPath(key);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: {}", keyPath, stringify(summary));
}
// Skip over keys that are ourselves and old S3N _$folder$ files
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
S3AFileStatus status = createFileStatus(keyPath, summary,
listingOperationCallbacks.getDefaultBlockSize(keyPath),
getStoreContext().getUsername(),
summary.getETag(), null);
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
} else {
LOG.debug("Ignoring: {}", keyPath);
ignored++;
}
}
// prefixes: always directories
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = getStoreContext()
.getContextAccessors()
.keyToPath(prefix);
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
getStoreContext().getUsername());
LOG.debug("Adding directory: {}", status);
added++;
stats.add(status);
} else {
LOG.debug("Ignoring directory: {}", keyPath);
ignored++;
}
}
// finish up
batchSize = stats.size();
statusBatchIterator = stats.listIterator();
boolean hasNext = statusBatchIterator.hasNext();
LOG.debug("Added {} entries; ignored {}; hasNext={}; hasMoreObjects={}",
added, ignored, hasNext, objects.isTruncated());
return hasNext;
}
/**
* Get the number of entries in the current batch.
* @return a number, possibly zero.
*/
public int getBatchSize() {
return batchSize;
}
}
/**
* Wraps up AWS `ListObjects` requests in a remote iterator
* which will ask for more listing data if needed.
*
* That is:
*
* 1. The first invocation of the {@link #next()} call will return the results
* of the first request, the one created during the construction of the
* instance.
*
* 2. Second and later invocations will continue the ongoing listing,
* calling {@link S3AFileSystem#continueListObjects} to request the next
* batch of results.
*
* 3. The {@link #hasNext()} predicate returns true for the initial call,
* where {@link #next()} will return the initial results. It declares
* that it has future results iff the last executed request was truncated.
*
* Thread safety: none.
*/
class ObjectListingIterator implements RemoteIterator<S3ListResult> {
/** The path listed. */
private final Path listPath;
/** The most recent listing results. */
private S3ListResult objects;
/** The most recent listing request. */
private S3ListRequest request;
/** Indicator that this is the first listing. */
private boolean firstListing = true;
/**
* Count of how many listings have been requested
* (including initial result).
*/
private int listingCount = 1;
/**
* Maximum keys in a request.
*/
private int maxKeys;
/**
* Future to store current batch listing result.
*/
private CompletableFuture<S3ListResult> s3ListResultFuture;
/**
* Result of previous batch.
*/
private S3ListResult objectsPrev;
/**
* Constructor -calls `listObjects()` on the request to populate the
* initial set of results/fail if there was a problem talking to the bucket.
* @param listPath path of the listing
* @param request initial request to make
* @throws IOException if listObjects raises one.
*/
@Retries.RetryRaw
ObjectListingIterator(
Path listPath,
S3ListRequest request) throws IOException {
this.listPath = listPath;
this.maxKeys = listingOperationCallbacks.getMaxKeys();
this.s3ListResultFuture = listingOperationCallbacks
.listObjectsAsync(request);
this.request = request;
this.objectsPrev = null;
}
/**
* Declare that the iterator has data if it is either is the initial
* iteration or it is a later one and the last listing obtained was
* incomplete.
* @throws IOException never: there is no IO in this operation.
*/
@Override
public boolean hasNext() throws IOException {
return firstListing ||
(objectsPrev != null && objectsPrev.isTruncated());
}
/**
* Ask for the next listing.
* For the first invocation, this returns the initial set, with no
* remote IO. For later requests, S3 will be queried, hence the calls
* may block or fail.
* @return the next object listing.
* @throws IOException if a query made of S3 fails.
* @throws NoSuchElementException if there is no more data to list.
*/
@Override
@Retries.RetryTranslated
public S3ListResult next() throws IOException {
if (firstListing) {
// clear the firstListing flag for future calls.
firstListing = false;
// Calculating the result of last async list call.
objects = awaitFuture(s3ListResultFuture);
fetchNextBatchAsyncIfPresent();
} else {
try {
if (objectsPrev!= null && !objectsPrev.isTruncated()) {
// nothing more to request: fail.
throw new NoSuchElementException("No more results in listing of "
+ listPath);
}
// Calculating the result of last async list call.
objects = awaitFuture(s3ListResultFuture);
// Requesting next batch of results.
fetchNextBatchAsyncIfPresent();
listingCount++;
LOG.debug("New listing status: {}", this);
} catch (AmazonClientException e) {
throw translateException("listObjects()", listPath, e);
}
}
// Storing the current result to be used by hasNext() call.
objectsPrev = objects;
return objectsPrev;
}
/**
* If there are more listings present, call for next batch async.
* @throws IOException
*/
private void fetchNextBatchAsyncIfPresent() throws IOException {
if (objects.isTruncated()) {
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
s3ListResultFuture = listingOperationCallbacks
.continueListObjectsAsync(request, objects);
}
}
@Override
public String toString() {
return "Object listing iterator against " + listPath
+ "; listing count "+ listingCount
+ "; isTruncated=" + objects.isTruncated();
}
/**
* Get the path listed.
* @return the path used in this listing.
*/
public Path getListPath() {
return listPath;
}
/**
* Get the count of listing requests.
* @return the counter of requests made (including the initial lookup).
*/
public int getListingCount() {
return listingCount;
}
}
/**
* Accept all entries except the base path and those which map to S3N
* pseudo directory markers.
*/
static class AcceptFilesOnly implements FileStatusAcceptor {
private final Path qualifiedPath;
public AcceptFilesOnly(Path qualifiedPath) {
this.qualifiedPath = qualifiedPath;
}
/**
* Reject a summary entry if the key path is the qualified Path, or
* it ends with {@code "_$folder$"}.
* @param keyPath key path of the entry
* @param summary summary entry
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.
*/
@Override
public boolean accept(Path keyPath, S3ObjectSummary summary) {
return !keyPath.equals(qualifiedPath)
&& !summary.getKey().endsWith(S3N_FOLDER_SUFFIX)
&& !objectRepresentsDirectory(summary.getKey(), summary.getSize());
}
/**
* Accept no directory paths.
* @param keyPath qualified path to the entry
* @param prefix common prefix in listing.
* @return false, always.
*/
@Override
public boolean accept(Path keyPath, String prefix) {
return false;
}
@Override
public boolean accept(FileStatus status) {
return (status != null) && status.isFile();
}
}
/**
* Take a remote iterator over a set of {@link FileStatus} instances and
* return a remote iterator of {@link LocatedFileStatus} instances.
*/
class LocatedFileStatusIterator
implements RemoteIterator<S3ALocatedFileStatus> {
private final RemoteIterator<S3AFileStatus> statusIterator;
/**
* Constructor.
* @param statusIterator an iterator over the remote status entries
*/
LocatedFileStatusIterator(RemoteIterator<S3AFileStatus> statusIterator) {
this.statusIterator = statusIterator;
}
@Override
public boolean hasNext() throws IOException {
return statusIterator.hasNext();
}
@Override
public S3ALocatedFileStatus next() throws IOException {
return listingOperationCallbacks
.toLocatedFileStatus(statusIterator.next());
}
}
/**
* Wraps another iterator and filters out files that appear in the provided
* set of tombstones. Will read ahead in the iterator when necessary to
* ensure that emptiness is detected early enough if only deleted objects
* remain in the source iterator.
*/
static class TombstoneReconcilingIterator implements
RemoteIterator<S3ALocatedFileStatus> {
private S3ALocatedFileStatus next = null;
private final RemoteIterator<S3ALocatedFileStatus> iterator;
private final Set<Path> tombstones;
/**
* @param iterator Source iterator to filter
* @param tombstones set of tombstone markers to filter out of results
*/
TombstoneReconcilingIterator(RemoteIterator<S3ALocatedFileStatus>
iterator, Set<Path> tombstones) {
this.iterator = iterator;
if (tombstones != null) {
this.tombstones = tombstones;
} else {
this.tombstones = Collections.emptySet();
}
}
private boolean fetch() throws IOException {
while (next == null && iterator.hasNext()) {
S3ALocatedFileStatus candidate = iterator.next();
if (!tombstones.contains(candidate.getPath())) {
next = candidate;
return true;
}
}
return false;
}
public boolean hasNext() throws IOException {
if (next != null) {
return true;
}
return fetch();
}
public S3ALocatedFileStatus next() throws IOException {
if (hasNext()) {
S3ALocatedFileStatus result = next;
next = null;
fetch();
return result;
}
throw new NoSuchElementException();
}
}
/**
* Accept all entries except those which map to S3N pseudo directory markers.
*/
static class AcceptAllButS3nDirs implements FileStatusAcceptor {
public boolean accept(Path keyPath, S3ObjectSummary summary) {
return !summary.getKey().endsWith(S3N_FOLDER_SUFFIX);
}
public boolean accept(Path keyPath, String prefix) {
return !keyPath.toString().endsWith(S3N_FOLDER_SUFFIX);
}
public boolean accept(FileStatus status) {
return !status.getPath().toString().endsWith(S3N_FOLDER_SUFFIX);
}
}
/**
* Accept all entries except the base path and those which map to S3N
* pseudo directory markers.
*/
public static class AcceptAllButSelfAndS3nDirs implements FileStatusAcceptor {
/** Base path. */
private final Path qualifiedPath;
/**
* Constructor.
* @param qualifiedPath an already-qualified path.
*/
public AcceptAllButSelfAndS3nDirs(Path qualifiedPath) {
this.qualifiedPath = qualifiedPath;
}
/**
* Reject a summary entry if the key path is the qualified Path, or
* it ends with {@code "_$folder$"}.
* @param keyPath key path of the entry
* @param summary summary entry
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.)
*/
@Override
public boolean accept(Path keyPath, S3ObjectSummary summary) {
return !keyPath.equals(qualifiedPath) &&
!summary.getKey().endsWith(S3N_FOLDER_SUFFIX);
}
/**
* Accept all prefixes except the one for the base path, "self".
* @param keyPath qualified path to the entry
* @param prefix common prefix in listing.
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.
*/
@Override
public boolean accept(Path keyPath, String prefix) {
return !keyPath.equals(qualifiedPath);
}
@Override
public boolean accept(FileStatus status) {
return (status != null) && !status.getPath().equals(qualifiedPath);
}
}
}