blob: 659312a7a1fa11e78d98a61b4ff3ff5a79a500b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ant.s3;
import java.io.File;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tools.ant.Project;
import org.apache.tools.ant.types.PatternSet;
import org.apache.tools.ant.types.selectors.SelectorUtils;
import org.apache.tools.ant.types.selectors.TokenizedPath;
import org.apache.tools.ant.types.selectors.TokenizedPattern;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.DeleteMarkerEntry;
import software.amazon.awssdk.services.s3.model.ListObjectVersionsResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.ObjectVersion;
import software.amazon.awssdk.services.s3.model.S3Object;
/**
* {@link AmazonS3} finder.
*/
class S3Finder implements Supplier<Optional<ObjectResource>> {
interface ObjectFactory<T> {
ObjectResource resourceFrom(Project project, S3Client s3, String bucket, T source);
}
static class Atom<T> {
static Comparator<Atom<?>> COMPARATOR = Comparator.<Atom<?>, String> comparing(Atom::key)
.thenComparing(Atom::latest).thenComparing(Atom::lastModified);
static Atom<String> asDir(S3Object o) {
return new Atom<String>(o.key(), o::key, () -> null, () -> true, o::lastModified, ObjectResource::ofPrefix);
}
static Atom<S3Object> of(S3Object o) {
return new Atom<S3Object>(o, o::key, () -> null, () -> true, o::lastModified, ObjectResource::new);
}
static Atom<DeleteMarkerEntry> of(DeleteMarkerEntry d) {
return new Atom<DeleteMarkerEntry>(d, d::key, d::versionId, d::isLatest, d::lastModified,
ObjectResource::new);
}
static Atom<ObjectVersion> of(ObjectVersion v) {
return new Atom<ObjectVersion>(v, v::key, v::versionId, v::isLatest, v::lastModified, ObjectResource::new);
}
final T source;
final Supplier<String> key;
final Supplier<String> versionId;
final BooleanSupplier latest;
final Supplier<Instant> lastModified;
final ObjectFactory<T> factory;
Atom(T source, Supplier<String> key, Supplier<String> versionId, BooleanSupplier latest,
Supplier<Instant> lastModified, ObjectFactory<T> factory) {
this.source = source;
this.key = key;
this.versionId = versionId;
this.latest = latest;
this.lastModified = lastModified;
this.factory = factory;
}
String key() {
return key.get();
}
boolean latest() {
return latest.getAsBoolean();
}
Instant lastModified() {
return lastModified.get();
}
ObjectResource object(Project project, S3Client s3, String bucket) {
return factory.resourceFrom(project, s3, bucket, source);
}
}
static abstract class BaseFrame<RESPONSE, SELF extends BaseFrame<RESPONSE, SELF>> {
final S3Finder finder;
final RESPONSE listing;
final String prefix;
final TokenizedPath path;
final Set<TokenizedPattern> includes;
final int maxDepth;
final Iterator<CommonPrefix> prefixes;
final Iterator<Atom<?>> contents;
final BiFunction<S3Finder, RESPONSE, SELF> factory;
protected BaseFrame(S3Finder finder, RESPONSE listing, Supplier<String> prefix, List<CommonPrefix> prefixes,
Stream<Atom<?>> contents, BiFunction<S3Finder, RESPONSE, SELF> factory) {
this.listing = listing;
this.prefix = prefix.get();
this.finder = finder;
path = finder.path(prefix.get());
includes = finder.patterns.getLeft();
maxDepth = includes.stream().mapToInt(
include -> include.containsPattern(SelectorUtils.DEEP_TREE_MATCH) ? Integer.MAX_VALUE : include.depth())
.max().orElse(Integer.MAX_VALUE);
if (includes.isEmpty()) {
this.prefixes = prefixes.iterator();
} else {
final int recurseDepth = path.depth() + (finder.includePrefixes ? 0 : 1);
this.prefixes = maxDepth > recurseDepth ? prefixes.stream().filter(this::allowPrefix).iterator()
: Collections.emptyIterator();
}
final boolean canMatch = includes.isEmpty() || includes.stream().anyMatch(include -> {
if (include.containsPattern(SelectorUtils.DEEP_TREE_MATCH)) {
return path.depth() > include.rtrimWildcardTokens().depth();
}
if (path.depth() == include.depth() && finder.includePrefixes) {
return true;
}
return include.depth() - path.depth() == 1;
});
this.contents = canMatch ? contents.filter(this::allow).iterator() : Collections.emptyIterator();
this.factory = factory;
}
final boolean allowPrefix(CommonPrefix prefix) {
return includes.stream().anyMatch(p -> p.matchStartOf(finder.path(prefix.prefix()), finder.caseSensitive));
}
final boolean allow(Atom<?> atom) {
final TokenizedPath path = finder.path(atom.key());
final boolean included = includes.isEmpty() || finder.matchesAny(includes, path);
return included && !finder.matchesAny(finder.patterns.getRight(), path);
}
final SELF push() {
final String nextPrefix = prefixes.next().prefix();
OptionalInt maxKeys = OptionalInt.empty();
if (maxDepth - path.depth() == 1 && finder.includePrefixes) {
final TokenizedPath nextPath = finder.path(nextPrefix);
if (includes.stream()
.allMatch(include -> include.depth() > 0
&& !SelectorUtils.hasWildcards(SelectorUtils.tokenizePath(include.getPattern()).lastElement())
&& include.matchPath(nextPath, finder.caseSensitive))) {
// looks like we're targeting the prefix; limit search appropriately:
maxKeys = OptionalInt.of(1);
}
}
return factory.apply(finder, push(nextPrefix, maxKeys));
}
final Optional<SELF> next() {
if (maxDepth == path.depth()) {
// only possible match was prefix, which we should have found in the first listing
return Optional.empty();
}
return nextResponse().map(r -> factory.apply(finder, r));
}
abstract Optional<RESPONSE> nextResponse();
abstract RESPONSE push(String prefix, OptionalInt maxKeys);
@Override
public final String toString() {
return String.format("%s[%s]", getClass().getSimpleName(), prefix);
}
}
private static Stream<Atom<?>> atoms(ListObjectsV2Response objects, boolean includePrefixes) {
return objects.contents().stream().<Atom<?>> map(o -> {
if (o.key().equals(objects.prefix())) {
return includePrefixes ? Atom.asDir(o) : null;
}
return Atom.of(o);
}).filter(Objects::nonNull);
}
static class ObjectsFrame extends BaseFrame<ListObjectsV2Response, ObjectsFrame> {
ObjectsFrame(S3Finder finder, ListObjectsV2Response objects) {
super(finder, objects, objects::prefix, objects.commonPrefixes(), atoms(objects, finder.includePrefixes),
ObjectsFrame::new);
}
@Override
Optional<ListObjectsV2Response> nextResponse() {
if (listing.isTruncated()) {
return Optional.of(finder.listObjects(prefix, listing.nextContinuationToken(), OptionalInt.empty()));
}
return Optional.empty();
}
@Override
ListObjectsV2Response push(String prefix, OptionalInt maxKeys) {
return finder.listObjects(prefix, listing.nextContinuationToken(), maxKeys);
}
}
private static boolean breaksKey(ListObjectVersionsResponse versions) {
return versions.isTruncated() && versions.nextKeyMarker().equals(versions.keyMarker());
}
private static Stream<Atom<?>> atoms(ListObjectVersionsResponse versions, boolean includePrefixes) {
Stream<Atom<?>> result =
Stream.concat(versions.deleteMarkers().stream().map(Atom::of), versions.versions().stream().map(Atom::of));
if (!includePrefixes) {
result = result.filter(a -> !a.key().equals(versions.prefix()));
}
if (versions.isTruncated()) {
// attempt to facilitate version ordering by omitting last/next key:
if (!breaksKey(versions)) {
final String nextKey = versions.nextKeyMarker();
result = result.filter(atom -> !nextKey.equals(atom.key()));
}
}
// we think plain objects listing is already sorted by key, so just sort versions atoms here:
result = result.sorted(Atom.COMPARATOR);
return result;
}
static class VersionsFrame extends BaseFrame<ListObjectVersionsResponse, VersionsFrame> {
VersionsFrame(S3Finder finder, ListObjectVersionsResponse versions) {
super(finder, versions, versions::prefix, versions.commonPrefixes(),
atoms(versions, finder.includePrefixes), VersionsFrame::new);
}
@Override
Optional<ListObjectVersionsResponse> nextResponse() {
if (listing.isTruncated()) {
final String vMarker;
if (breaksKey(listing)) {
vMarker = listing.nextVersionIdMarker();
} else {
vMarker = null;
}
return Optional.of(finder.listVersions(prefix, listing.nextKeyMarker(), vMarker, OptionalInt.empty()));
}
return Optional.empty();
}
@Override
ListObjectVersionsResponse push(String prefix, OptionalInt maxKeys) {
return finder.listVersions(prefix, null, null, maxKeys);
}
}
private static Optional<String> determinePrefix(Set<TokenizedPattern> includes) {
Set<TokenizedPattern> patterns = includes.stream().map(TokenizedPattern::rtrimWildcardTokens)
.map(path -> path.depth() == 0 ? path.toPattern()
: new TokenizedPattern(StringUtils.appendIfMissing(path.toString(), File.separator)))
.collect(Collectors.toSet());
for (int depth = patterns.stream().mapToInt(TokenizedPattern::depth).min().orElse(0); patterns
.size() > 1; depth--) {
final int d = depth;
patterns = patterns.stream().map(p -> {
while (p.depth() > d) {
p = p.withoutLastToken();
}
return p;
}).collect(Collectors.toSet());
}
return patterns.stream().findFirst().map(TokenizedPattern::getPattern);
}
private final Deque<BaseFrame<?, ?>> staque = new ArrayDeque<>();
private final Project project;
private final S3Client s3;
private final String bucket;
private final String delimiter;
private final boolean caseSensitive;
private final Pair<Set<TokenizedPattern>, Set<TokenizedPattern>> patterns;
private final boolean includePrefixes;
/**
* Create a new {@link S3Finder} instance.
*
* @param project
* @param s3
* @param bucket
* @param precision
* @param delimiter
* @param patterns
* @param includePrefixes
*/
S3Finder(Project project, S3Client s3, String bucket, Precision precision, String delimiter, PatternSet patterns,
boolean caseSensitive, boolean includePrefixes) {
this.project = project;
this.s3 = s3;
this.bucket = bucket;
this.delimiter = delimiter;
this.patterns = tokenize(patterns);
this.caseSensitive = caseSensitive;
this.includePrefixes = includePrefixes;
final String prefix;
if (caseSensitive) {
prefix =
determinePrefix(this.patterns.getLeft()).map(p -> p.replace(File.separator, delimiter)).orElse(null);
} else {
prefix = null;
}
staque.push(root(precision, prefix));
}
private Pair<Set<TokenizedPattern>, Set<TokenizedPattern>> tokenize(PatternSet patterns) {
final Function<String[], Set<TokenizedPattern>> tokenizer = p -> p == null ? Collections.emptySet()
: Stream.of(p).map(this::path).map(TokenizedPath::toPattern).collect(Collectors.toSet());
return Pair.of(tokenizer.apply(patterns.getIncludePatterns(project)),
tokenizer.apply(patterns.getExcludePatterns(project)));
}
/**
* {@inheritDoc}
*/
@Override
public synchronized Optional<ObjectResource> get() {
while (!staque.isEmpty()) {
BaseFrame<?, ?> top = staque.peek();
while (top.prefixes.hasNext()) {
top = top.push();
staque.push(top);
}
if (top.contents.hasNext()) {
return Optional.of(top.contents.next().object(project, s3, bucket));
}
staque.pop();
final Optional<? extends BaseFrame<?, ?>> next = top.next();
if (next.isPresent()) {
top = next.get();
staque.push(top);
}
}
return Optional.empty();
}
private BaseFrame<?, ?> root(Precision precision, String prefix) {
switch (precision) {
case object:
return new ObjectsFrame(this, listObjects(prefix, null, OptionalInt.empty()));
case version:
return new VersionsFrame(this, listVersions(prefix, null, null, OptionalInt.empty()));
default:
throw Exceptions.create(IllegalStateException::new, "Unknown %s %s", Precision.class.getSimpleName(),
precision);
}
}
ListObjectsV2Response listObjects(String prefix, String continuationToken, OptionalInt maxKeys) {
project.log(String.format("listing %s objects '%s' '%s'", bucket, prefix, continuationToken),
Project.MSG_DEBUG);
return s3.listObjectsV2(
req -> {
req.bucket(bucket).delimiter(delimiter).prefix(prefix).continuationToken(continuationToken);
maxKeys.ifPresent(req::maxKeys);
});
}
ListObjectVersionsResponse listVersions(String prefix, String keyMarker, String versionMarker, OptionalInt maxKeys) {
project.log(String.format("listing %s versions '%s' '%s' '%s'", bucket, prefix, keyMarker, versionMarker),
Project.MSG_DEBUG);
return s3.listObjectVersions(b -> {
b.bucket(bucket).delimiter(delimiter).prefix(prefix).keyMarker(keyMarker).versionIdMarker(versionMarker);
maxKeys.ifPresent(b::maxKeys);
});
}
TokenizedPath path(String s) {
if (s == null) {
return TokenizedPath.EMPTY_PATH;
}
return new TokenizedPath(StringUtils.removeStart(s, delimiter).replace(delimiter, File.separator));
}
boolean matchesAny(Collection<TokenizedPattern> tokenizedPatterns, TokenizedPath path) {
return tokenizedPatterns.stream().anyMatch(pattern -> pattern.matchPath(path, caseSensitive));
}
}