blob: 585243b31a752c6672d01d804e5c3ae1db69287d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ant.s3;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.tools.ant.Project;
import org.apache.tools.ant.types.Resource;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteMarkerEntry;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectVersion;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.S3Request;
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.Tagging;
/**
* Amazon S3 object {@link Resource} implementation.
*/
public class ObjectResource extends Resource implements ProjectUtils {
private static class VersionInfo {
final boolean deleteMarker;
final boolean latest;
VersionInfo(boolean deleteMarker, boolean latest) {
this.deleteMarker = deleteMarker;
this.latest = latest;
}
}
@FunctionalInterface
private interface Finalizer {
static final Finalizer NOP = () -> {};
void run() throws Exception;
default Finalizer then(Finalizer next) {
return () -> {
run();
next.run();
};
}
}
private static Finalizer closeWeak(Closeable closeable) {
final Reference<Closeable> ref = new WeakReference<>(closeable);
return () -> Optional.of(ref).map(Reference::get).ifPresent(t -> {
try {
t.close();
} catch (IOException e) {
}
});
}
private S3Client s3;
private String bucket;
private String key;
private volatile LongSupplier size;
private volatile Supplier<Instant> lastModified;
private volatile Supplier<String> versionId;
private String contentType;
private volatile Map<String, String> metadata;
private volatile Map<String, String> tagging;
private volatile Boolean exists;
private volatile Finalizer finalizer = Finalizer.NOP;
private volatile HeadObjectResponse head;
private Optional<VersionInfo> versionInfo = Optional.empty();
/**
* Create a new {@link ObjectResource}.
*
* @param project
* Ant {@link Project}
*/
public ObjectResource(Project project) {
setProject(project);
}
/**
* Create an {@link ObjectResource} from a listing.
*
* @param project
* Ant {@link Project}
* @param s3
* {@link S3Client}
* @param bucket
* of object
* @param summary
* object info
*/
ObjectResource(Project project, S3Client s3, String bucket, S3Object summary) {
this(project, s3, bucket, summary.key(), summary::size, summary::lastModified, null, Precision.object);
}
/**
* Create an {@link ObjectResource} representing a delete marker.
*
* @param project
* Ant {@link Project}
* @param s3
* {@link S3Client}
* @param bucket
* of object
* @param deleteMarker
* info
*/
ObjectResource(Project project, S3Client s3, String bucket, DeleteMarkerEntry deleteMarker) {
this(project, s3, bucket, deleteMarker.key(), () -> UNKNOWN_SIZE, deleteMarker::lastModified,
deleteMarker::versionId, Precision.version);
versionInfo = Optional.ofNullable(new VersionInfo(true, deleteMarker.isLatest().booleanValue()));
}
/**
* Create an {@link ObjectResource} representing an object version.
*
* @param project
* Ant {@link Project}
* @param s3
* {@link S3Client}
* @param bucket
* of object
* @param version
* info
*/
ObjectResource(Project project, S3Client s3, String bucket, ObjectVersion version) {
this(project, s3, bucket, version.key(), version::size, version::lastModified, version::versionId,
Precision.version);
versionInfo = Optional.of(new VersionInfo(false, version.isLatest().booleanValue()));
}
/**
* Create an {@link ObjectResource}.
*
* @param project
* Ant {@link Project}
* @param s3
* {@link S3Client}
* @param bucket
* of object
* @param key
* of object
* @param size
* of object
* @param lastModified
* of object
* @param versionId
* of object
* @param precision
* of object
*/
ObjectResource(Project project, S3Client s3, String bucket, String key, LongSupplier size,
Supplier<Instant> lastModified, Supplier<String> versionId, Precision precision) {
this(project, s3, bucket, key);
this.size = size;
this.lastModified = lastModified;
this.versionId = versionId;
}
/**
* Create an {@link ObjectResource} representing a prefix.
*
* @param project
* Ant {@link Project}
* @param s3
* {@link S3Client}
* @param bucket
* of prefix
* @param prefix
* {@link String}
* @return {@link ObjectResource}
*/
static ObjectResource ofPrefix(Project project, S3Client s3, String bucket, String prefix) {
final ObjectResource result = new ObjectResource(project, s3, bucket, prefix);
result._setDirectory(true);
return result;
}
/**
* Create a new {@link ObjectResource} with complete identifying info.
*
* @param project
* Ant {@link Project}
* @param s3
* {@link S3Client}
* @param bucket
* of object
* @param key
* of object
*/
ObjectResource(Project project, S3Client s3, String bucket, String key) {
setProject(project);
setBucket(bucket);
setKey(key);
this.s3 = s3;
}
/**
* Get the bucket of the S3 object.
*
* @return {@link String}
*/
public String getBucket() {
return isReference() ? getRef().getBucket() : bucket;
}
/**
* Set the bucket of the S3 object.
*
* @param bucket
* of object
*/
public void setBucket(String bucket) {
checkAttributesAllowed();
this.bucket = bucket;
}
/**
* Get the S3 object key within its bucket.
*
* @return {@link String}
*/
public String getKey() {
if (isReference()) {
return getRef().getKey();
}
return key;
}
/**
* Set the key of the S3 object within its bucket.
*
* @param key
* of object
*/
public void setKey(String key) {
this.key = key;
}
/**
* Set the name of the S3 object, which for our purposes is equivalent to
* calling {@link #setKey(String)}.
*
* @param name
* {@code key}
*/
@Override
public void setName(String name) {
setKey(name);
}
/**
* Get the name of the S3 object, which may be suffixed with the object
* version if {@link #getPrecision()} returns {@link Precision#version} and
* this object exists.
*
* @return {@link String}
*/
@Override
public String getName() {
final String result = getKey();
if (getPrecision() == Precision.version) {
final String version = getVersionId();
if (StringUtils.isNotBlank(version)) {
return String.format("%s@%s", result, version);
}
}
return result;
}
/**
* {@inheritDoc}
*
* @throws UnsupportedOperationException
* always
*/
@Override
public void setDirectory(boolean directory) {
throw new UnsupportedOperationException();
}
/**
* Get the precision of this object, which impacts only its name.
*
* @return {@link Precision}
*/
public Precision getPrecision() {
return versionInfo.isPresent() ? Precision.version : Precision.object;
}
/**
* Learn whether this object represents a delete marker.
*
* @return {@code boolean}
*/
public boolean isDeleteMarker() {
return versionInfo.filter(i -> i.deleteMarker).isPresent();
}
/**
* Learn whether this object is the latest revision per bucket + key (always
* {@code true} for objects with {@link Precision#object}).
*
* @return {@code boolean}
*/
public boolean isLatest() {
return !versionInfo.isPresent() || versionInfo.get().latest;
}
/**
* Get the content type of the S3 object.
*
* @return {@link String}
*/
public String getContentType() {
if (contentType == null) {
return head().map(HeadObjectResponse::contentType).orElse(null);
}
return contentType;
}
/**
* Set the content type of the S3 object.
*
* @param contentType
* of object
*/
public void setContentType(String contentType) {
this.contentType = contentType;
}
/**
* Create the nested {@code metadata} element.
*
* @return {@link InlineProperties}
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public InlineProperties createMetadata() {
checkChildrenAllowed();
Exceptions.raiseUnless(metadata == null, buildException(), "metadata already specified");
final InlineProperties result = new InlineProperties(getProject());
metadata = (Map) result.properties;
return result;
}
/**
* Get the {@link Map} of metadata.
*
* @return {@link Map}
*/
public Map<String, String> getMetadata() {
if (isReference()) {
return getRef().getMetadata();
}
if (metadata == null) {
synchronized (this) {
if (metadata == null) {
metadata = new LinkedHashMap<>();
head().map(HeadObjectResponse::metadata).ifPresent(metadata::putAll);
}
}
}
return metadata;
}
/**
* Set the metadata.
*
* @param metadata
* of object
*/
public void setMetadata(Map<String, String> metadata) {
Exceptions.raiseIf(isReference(), UnsupportedOperationException::new, "setMetadata");
this.metadata = metadata;
}
/**
* Create the nested {@code tagging} element.
*
* @return {@link InlineProperties}
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public InlineProperties createTagging() {
checkChildrenAllowed();
Exceptions.raiseUnless(tagging == null, buildException(), "tagging already specified");
final InlineProperties result = new InlineProperties(getProject());
tagging = (Map) result.properties;
return result;
}
/**
* Get the tagging of the S3 object.
*
* @return {@link Map}
*/
public Map<String, String> getTagging() {
if (isReference()) {
return getRef().getTagging();
}
if (tagging == null) {
synchronized (this) {
if (tagging == null) {
tagging = new LinkedHashMap<>();
readTagging().ifPresent(r -> {
r.tagSet().stream().forEach(t -> tagging.put(t.key(), t.value()));
});
}
}
}
return tagging;
}
/**
* Set the tagging for the S3 object.
*
* @param tagging
* of object
*/
public void setTagging(Map<String, String> tagging) {
if (isReference()) {
throw new UnsupportedOperationException();
}
this.tagging = tagging;
}
/**
* Add the configured {@link Client} element.
*
* @param s3
* {@link Client}
*/
public void addConfigured(Client s3) {
checkChildrenAllowed();
Exceptions.raiseUnless(this.s3 == null, buildException(),
() -> String.format("%s already specified", componentName(Client.class)));
this.s3 = Objects.requireNonNull(s3).get();
}
/**
* Set the {@link Client} by reference.
*
* @param refid
* of {@link Client}
*/
public void setClientRefid(String refid) {
checkAttributesAllowed();
Exceptions.raiseIf(StringUtils.isBlank(refid), buildException(), "@clientrefid must not be null/empty/blank");
addConfigured(getProject().<Client> getReference(refid));
}
/**
* {@inheritDoc}
*/
@Override
public boolean isExists() {
if (isReference()) {
return getRef().isExists();
}
if (exists == null) {
synchronized (this) {
if (exists == null) {
exists = Boolean.valueOf(head().isPresent());
}
}
}
return exists.booleanValue();
}
/**
* {@inheritDoc}
*/
@Override
public long getSize() {
if (isReference()) {
return getRef().getSize();
}
if (size == null) {
return head().map(HeadObjectResponse::contentLength).orElse(UNKNOWN_SIZE);
}
return size.getAsLong();
}
/**
* {@inheritDoc}
*/
@Override
public long getLastModified() {
if (isReference()) {
return getRef().getLastModified();
}
if (lastModified == null) {
return head().map(r -> r.lastModified().toEpochMilli()).orElse(UNKNOWN_DATETIME);
}
return lastModified.get().toEpochMilli();
}
/**
* Get the version ID, if available.
*
* @return {@link String}
*/
public String getVersionId() {
if (isReference()) {
return getRef().getVersionId();
}
if (versionId == null) {
return head().map(HeadObjectResponse::versionId).orElse(null);
}
return versionId.get();
}
/**
* {@inheritDoc}
*/
@Override
public InputStream getInputStream() throws IOException {
if (isReference()) {
return getRef().getInputStream();
}
final ResponseInputStream<GetObjectResponse> result = object();
finalizer = finalizer.then(closeWeak(result));
return result;
}
/**
* {@inheritDoc}
*/
@Override
public OutputStream getOutputStream() throws IOException {
if (isReference()) {
return getRef().getOutputStream();
}
final Consumer<PutObjectRequest.Builder> put = put();
return new ByteArrayOutputStream() {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
s3().putObject(put, RequestBody.fromBytes(toByteArray()));
}
}
};
}
/**
* Put a {@link File} as the content of this S3 object.
*
* @param file
* source
*/
public void put(File file) {
if (isReference()) {
getRef().put(file);
} else {
put(s3(), file);
}
}
/**
* Put a {@link File} as the content of thie S3 object, specifying the
* {@link S3Client} client to use.
*
* @param s3
* {@link S3Client} to use
* @param file
* source
*/
public void put(S3Client s3, File file) {
if (isReference()) {
getRef().put(s3, file);
}
s3.putObject(put(), Objects.requireNonNull(file, "file").toPath());
}
/**
* Delete this object from its bucket using its configured client.
*/
public void delete() {
if (isReference()) {
getRef().delete();
}
delete(s3());
}
/**
* Delete this object from its bucket using the specified {@link S3Client}.
*
* @param s3
* {@link S3Client} to use
*/
public void delete(S3Client s3) {
if (isReference()) {
getRef().delete(s3);
}
s3.deleteObject(request(DeleteObjectRequest.Builder::bucket, DeleteObjectRequest.Builder::key,
DeleteObjectRequest.Builder::versionId));
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return isReference() ? getRef().toString() : String.format("s3://%s/%s", getBucket(), getName());
}
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object other) {
return super.equals(other);
}
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return isReference() ? getRef().hashCode() : Objects.hash(ObjectResource.class, getBucket(), getKey());
}
/**
* Learn whether this {@link ObjectResource} is "fully equal" to the
* specified {@link Object}. This means that locally-configurable S3 object
* data are equal, in addition to {@link Object#equals(Object)} equality.
*
* @param other
* object to compare for equality
* @return {@code boolean}
*/
public boolean fullyEquals(Object other) {
return super.equals(other) && ((Resource) other).asOptional(ObjectResource.class)
.filter(s3o -> getContentType().equals(s3o.getContentType()) && getMetadata().equals(s3o.getMetadata())
&& getTagging().equals(s3o.getTagging()) && StringUtils.equals(getVersionId(), s3o.getVersionId()))
.isPresent();
}
/**
* {@inheritDoc}
*/
@Override
protected void finalize() throws Throwable {
finalizer.run();
super.finalize();
}
/**
* {@inheritDoc}
*/
@Override
protected ObjectResource getRef() {
return getCheckedRef(ObjectResource.class);
}
private ResponseInputStream<GetObjectResponse> object() {
return s3().getObject(request(GetObjectRequest.Builder::bucket, GetObjectRequest.Builder::key,
GetObjectRequest.Builder::versionId));
}
private Consumer<PutObjectRequest.Builder> put() {
final Precision precision = getPrecision();
Exceptions.raiseIf(precision == Precision.version, UnsupportedOperationException::new,
"Put not supported for objects with %s %s", precision, Precision.class.getSimpleName());
final Optional<String> _contentType = Optional.ofNullable(getContentType());
final Optional<Map<String, String>> _metadata = Optional.of(getMetadata()).filter(m -> !m.isEmpty());
final Optional<Tagging> _tagging = Optional
.of(getTagging()).filter(m -> !m.isEmpty()).map(m -> m.entrySet().stream()
.map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build()).collect(Collectors.toList()))
.map(Tagging.builder()::tagSet).map(Tagging.Builder::build);
return request(PutObjectRequest.Builder::bucket, PutObjectRequest.Builder::key).andThen(b -> {
_contentType.ifPresent(b::contentType);
_metadata.ifPresent(b::metadata);
_tagging.ifPresent(b::tagging);
});
}
private <B extends S3Request.Builder> Consumer<B> request(BiConsumer<B, String> setBucket,
BiConsumer<B, String> setKey) {
return b -> {
setBucket.accept(b, require(getBucket(), "@bucket"));
setKey.accept(b, require(getKey(), "@key"));
};
}
private <B extends S3Request.Builder> Consumer<B> request(BiConsumer<B, String> setBucket,
BiConsumer<B, String> setKey, BiConsumer<B, String> setVersionId) {
Consumer<B> result = request(setBucket, setKey);
if (getPrecision() == Precision.version) {
result = result.andThen(b -> {
final String versionId = getVersionId();
if (versionId != null) {
setVersionId.accept(b, versionId);
}
});
}
return result;
}
private Optional<GetObjectTaggingResponse> readTagging() {
try {
return Optional.of(s3().getObjectTagging(request(GetObjectTaggingRequest.Builder::bucket,
GetObjectTaggingRequest.Builder::key, GetObjectTaggingRequest.Builder::versionId)));
} catch (NoSuchKeyException e) {
return Optional.empty();
}
}
private Optional<HeadObjectResponse> head() {
if (head == null) {
synchronized (this) {
if (head == null) {
try {
Consumer<HeadObjectRequest.Builder> request =
request(HeadObjectRequest.Builder::bucket, HeadObjectRequest.Builder::key);
if (getPrecision() == Precision.version && versionId != null) {
request = request.andThen(b -> b.versionId(versionId.get()));
}
head = s3().headObject(request);
} catch (NoSuchKeyException e) {
return Optional.empty();
}
}
}
}
return Optional.ofNullable(head);
}
private S3Client s3() {
return requireComponent(s3, Client.class);
}
private void _setDirectory(boolean directory) {
super.setDirectory(directory);
}
}