MINIFI-366: Adds S3ConfigurationCache.
This closes #90.
Signed-off-by: Aldrin Piri <aldrin@apache.org>
diff --git a/minifi-c2/minifi-c2-assembly/NOTICE b/minifi-c2/minifi-c2-assembly/NOTICE
index ebdfa69..2cf824f 100644
--- a/minifi-c2/minifi-c2-assembly/NOTICE
+++ b/minifi-c2/minifi-c2-assembly/NOTICE
@@ -8,6 +8,9 @@
Apache Software License v2
===========================================
+This product includes the following work from the Apache Kafka project:
+S3OutputStream.java
+
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache NiFi
@@ -67,6 +70,22 @@
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+ (ASLv2) AWS Java SDK
+ The following NOTICE information applies:
+ Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+ This product includes software developed by
+ Amazon Technologies, Inc (http://www.amazon.com/).
+
+ **********************
+ THIRD PARTY COMPONENTS
+ **********************
+ This software includes third party software subject to the following copyrights:
+ - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
+ - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+ The licenses for these third party components are included in LICENSE.txt
+
(ASLv2) JsonPath
The following NOTICE information applies:
Copyright 2011 JsonPath authors
diff --git a/minifi-c2/minifi-c2-assembly/pom.xml b/minifi-c2/minifi-c2-assembly/pom.xml
index 32dca5a..aaede99 100644
--- a/minifi-c2/minifi-c2-assembly/pom.xml
+++ b/minifi-c2/minifi-c2-assembly/pom.xml
@@ -84,6 +84,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-c2-cache-s3</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-c2-provider-cache</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2-context.xml b/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2-context.xml
index bc23c97..22cc377 100644
--- a/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2-context.xml
+++ b/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2-context.xml
@@ -64,6 +64,35 @@
<value>\${class}.v\${version}</value>
</constructor-arg>
</bean>-->
+ <!--<bean class="org.apache.nifi.minifi.c2.provider.cache.CacheConfigurationProvider">
+ <constructor-arg>
+ <list>
+ <value>text/yml</value>
+ </list>
+ </constructor-arg>
+ <constructor-arg>
+ <bean class="org.apache.nifi.minifi.c2.cache.s3.S3ConfigurationCache">
+ <constructor-arg>
+ <value>bucket</value>
+ </constructor-arg>
+ <constructor-arg>
+ <value>prefix/</value>
+ </constructor-arg>
+ <constructor-arg>
+ <value>\${class}</value>
+ </constructor-arg>
+ <constructor-arg>
+ <value>access-key</value>
+ </constructor-arg>
+ <constructor-arg>
+ <value>secret-key</value>
+ </constructor-arg>
+ <constructor-arg>
+ <value>aws-region</value>
+ </constructor-arg>
+ </bean>
+ </constructor-arg>
+ </bean> -->
</list>
</constructor-arg>
<constructor-arg>
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/pom.xml b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/pom.xml
new file mode 100644
index 0000000..dbe2d59
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>minifi-c2-cache</artifactId>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <version>0.2.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>minifi-c2-cache-s3</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-c2-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3CacheFileInfoImpl.java b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3CacheFileInfoImpl.java
new file mode 100644
index 0000000..f93d1a8
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3CacheFileInfoImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.cache.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.iterable.S3Objects;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
+import org.apache.nifi.minifi.c2.api.cache.ConfigurationCacheFileInfo;
+import org.apache.nifi.minifi.c2.api.cache.WriteableConfiguration;
+import org.apache.nifi.minifi.c2.api.util.Pair;
+
+public class S3CacheFileInfoImpl implements ConfigurationCacheFileInfo {
+
+ private final AmazonS3 s3;
+ private final String bucket;
+ private final String prefix;
+ private final String expectedFilename;
+
+ /**
+ * Creates a new S3 cache file info.
+ * @param s3 The {@link AmazonS3 client}.
+ * @param bucket The S3 bucket.
+ * @param prefix The S3 object prefix.
+ */
+ public S3CacheFileInfoImpl(AmazonS3 s3, String bucket, String prefix,
+ String expectedFilename) {
+
+ this.s3 = s3;
+ this.bucket = bucket;
+ this.prefix = prefix;
+ this.expectedFilename = expectedFilename;
+
+ }
+
+ @Override
+ public Integer getVersionIfMatch(String objectKey) {
+
+ String filename = objectKey.substring(prefix.length());
+
+ int expectedFilenameLength = expectedFilename.length();
+ if (!filename.startsWith(expectedFilename) || filename.length() == expectedFilenameLength) {
+ return null;
+ }
+ try {
+ return Integer.parseInt(filename.substring(expectedFilenameLength));
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public WriteableConfiguration getConfiguration(Integer version)
+ throws ConfigurationProviderException {
+
+ if (version == null) {
+
+ try {
+ return getCachedConfigurations().findFirst()
+ .orElseThrow(() -> new ConfigurationProviderException("No configurations found."));
+ } catch (IOException e) {
+ throw new ConfigurationProviderException("Unable to get cached configurations.", e);
+ }
+
+ } else {
+
+ final S3Object s3Object;
+
+ if (StringUtils.isEmpty(prefix) || StringUtils.equals(prefix, "/")) {
+ s3Object = s3.getObject(new GetObjectRequest(bucket,
+ expectedFilename + version.toString()));
+ } else {
+ s3Object = s3.getObject(new GetObjectRequest(bucket,
+ prefix + expectedFilename + version.toString()));
+ }
+
+ if (s3Object == null) {
+ throw new ConfigurationProviderException("No configurations found for object key.");
+ }
+
+ return new S3WritableConfiguration(s3, s3Object, Integer.toString(version));
+
+ }
+
+ }
+
+ @Override
+ public Stream<WriteableConfiguration> getCachedConfigurations() throws IOException {
+
+ Iterable<S3ObjectSummary> objectSummaries = S3Objects.withPrefix(s3, bucket, prefix);
+ Stream<S3ObjectSummary> objectStream = StreamSupport.stream(objectSummaries.spliterator(), false);
+
+ return objectStream.map(p -> {
+ Integer version = getVersionIfMatch(p.getKey());
+ if (version == null) {
+ return null;
+ }
+ return new Pair<>(version, p);
+ }).filter(Objects::nonNull)
+ .sorted(Comparator.comparing(pair -> ((Pair<Integer, S3ObjectSummary>) pair).getFirst())
+ .reversed()).map(pair -> new S3WritableConfiguration(s3, pair.getSecond(), Integer.toString(pair.getFirst())));
+
+ }
+
+}
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3ConfigurationCache.java b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3ConfigurationCache.java
new file mode 100644
index 0000000..01a1380
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3ConfigurationCache.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.cache.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.c2.api.InvalidParameterException;
+import org.apache.nifi.minifi.c2.api.cache.ConfigurationCache;
+import org.apache.nifi.minifi.c2.api.cache.ConfigurationCacheFileInfo;
+
+public class S3ConfigurationCache implements ConfigurationCache {
+
+ private final AmazonS3 s3;
+ private final String bucket;
+ private final String prefix;
+ private final String pathPattern;
+
+ /**
+ * Creates a new S3 configuration cache.
+ * @param bucket The S3 bucket.
+ * @param prefix The S3 object prefix.
+ * @param pathPattern The path pattern.
+ * @param accessKey The (optional) S3 access key.
+ * @param secretKey The (optional) S3 secret key.
+ * @param region The AWS region (e.g. us-east-1).
+ * @throws IOException Thrown if the configuration cannot be read.
+ */
+ public S3ConfigurationCache(String bucket, String prefix, String pathPattern,
+ String accessKey, String secretKey, String region) throws IOException {
+
+ this.bucket = bucket;
+ this.prefix = prefix;
+ this.pathPattern = pathPattern;
+
+ if (!StringUtils.isEmpty(accessKey)) {
+
+ s3 = AmazonS3Client.builder()
+ .withCredentials(new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(accessKey, secretKey)))
+ .withRegion(Regions.fromName(region))
+ .build();
+
+ } else {
+
+ s3 = AmazonS3Client.builder()
+ .withRegion(Regions.fromName(region))
+ .build();
+ }
+
+ }
+
+ @Override
+ public ConfigurationCacheFileInfo getCacheFileInfo(String contentType,
+ Map<String, List<String>> parameters) throws InvalidParameterException {
+
+ String pathString = pathPattern;
+ for (Map.Entry<String, List<String>> entry : parameters.entrySet()) {
+ if (entry.getValue().size() != 1) {
+ throw new InvalidParameterException("Multiple values for same parameter"
+ + " are not supported by this provider.");
+ }
+ pathString = pathString.replaceAll(Pattern.quote("${" + entry.getKey() + "}"),
+ entry.getValue().get(0));
+ }
+ pathString = pathString + "." + contentType.replace('/', '.');
+ String[] split = pathString.split("/");
+ for (String s1 : split) {
+ int openBrace = s1.indexOf("${");
+ if (openBrace >= 0 && openBrace < s1.length() + 2) {
+ int closeBrace = s1.indexOf("}", openBrace + 2);
+ if (closeBrace >= 0) {
+ throw new InvalidParameterException("Found unsubstituted variable "
+ + s1.substring(openBrace + 2, closeBrace));
+ }
+ }
+ }
+
+ return new S3CacheFileInfoImpl(s3, bucket, prefix, pathString + ".v");
+ }
+
+}
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3OutputStream.java b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3OutputStream.java
new file mode 100644
index 0000000..abd4e46
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3OutputStream.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.cache.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This implementation has heavily borrowed Kafka's implementation
+ * which borrowed the general structure of Hadoop's implementation.
+ */
+public class S3OutputStream extends OutputStream {
+
+ private static final Logger log = LoggerFactory.getLogger(S3OutputStream.class);
+
+ private final AmazonS3 s3;
+ private final String bucket;
+ private final String key;
+ private final ProgressListener progressListener;
+ private final int partSize;
+ private boolean closed;
+ private ByteBuffer buffer;
+ private MultipartUpload multiPartUpload;
+
+ /**
+ * Creates a new S3 output stream for an object.
+ * @param bucket The bucket.
+ * @param key The object's key.
+ * @param s3 An S3 {@link AmazonS3 client}.
+ */
+ public S3OutputStream(String bucket, String key, AmazonS3 s3) {
+ this.s3 = s3;
+ this.bucket = bucket;
+ this.key = key;
+ this.partSize = 1024;
+ this.closed = false;
+ this.buffer = ByteBuffer.allocate(this.partSize);
+ this.progressListener = new ConnectProgressListener();
+ this.multiPartUpload = null;
+ log.debug("Create S3OutputStream for bucket '{}' key '{}'", bucket, key);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buffer.put((byte) b);
+ if (!buffer.hasRemaining()) {
+ uploadPart();
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || off > b.length || len < 0 || (off + len) > b.length || (off + len) < 0) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+
+ if (buffer.remaining() < len) {
+ int firstPart = buffer.remaining();
+ buffer.put(b, off, firstPart);
+ uploadPart();
+ write(b, off + firstPart, len - firstPart);
+ } else {
+ buffer.put(b, off, len);
+ }
+ }
+
+ private void uploadPart() throws IOException {
+ uploadPart(partSize);
+ buffer.clear();
+ }
+
+ private void uploadPart(int size) throws IOException {
+ if (multiPartUpload == null) {
+ log.debug("New multi-part upload for bucket '{}' key '{}'", bucket, key);
+ multiPartUpload = newMultipartUpload();
+ }
+
+ try {
+ multiPartUpload.uploadPart(new ByteArrayInputStream(buffer.array()), size);
+ } catch (Exception e) {
+ if (multiPartUpload != null) {
+ multiPartUpload.abort();
+ log.debug("Multipart upload aborted for bucket '{}' key '{}'.", bucket, key);
+ }
+ throw new IOException("Part upload failed: ", e.getCause());
+ }
+ }
+
+ public void commit() throws IOException {
+ if (closed) {
+ log.warn("Tried to commit data for bucket '{}' key '{}' on a closed stream. Ignoring.", bucket, key);
+ return;
+ }
+
+ try {
+ if (buffer.hasRemaining()) {
+ uploadPart(buffer.position());
+ }
+ multiPartUpload.complete();
+ log.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
+ } catch (Exception e) {
+ log.error("Multipart upload failed to complete for bucket '{}' key '{}'", bucket, key);
+ throw new RuntimeException("Multipart upload failed to complete.", e);
+ } finally {
+ buffer.clear();
+ multiPartUpload = null;
+ close();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ if (multiPartUpload != null) {
+ multiPartUpload.abort();
+ log.debug("Multipart upload aborted for bucket '{}' key '{}'.", bucket, key);
+ }
+ super.close();
+ }
+
+ private MultipartUpload newMultipartUpload() throws IOException {
+ InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, key, new ObjectMetadata());
+ try {
+ return new MultipartUpload(s3.initiateMultipartUpload(initRequest).getUploadId());
+ } catch (AmazonClientException e) {
+ throw new IOException("Unable to initiate MultipartUpload: " + e, e);
+ }
+ }
+
+ private class MultipartUpload {
+ private final String uploadId;
+ private final List<PartETag> partETags;
+
+ public MultipartUpload(String uploadId) {
+ this.uploadId = uploadId;
+ this.partETags = new ArrayList<>();
+ log.debug("Initiated multi-part upload for bucket '{}' key '{}' with id '{}'", bucket, key, uploadId);
+ }
+
+ public void uploadPart(ByteArrayInputStream inputStream, int partSize) {
+ int currentPartNumber = partETags.size() + 1;
+ UploadPartRequest request = new UploadPartRequest()
+ .withBucketName(bucket)
+ .withKey(key)
+ .withUploadId(uploadId)
+ .withInputStream(inputStream)
+ .withPartNumber(currentPartNumber)
+ .withPartSize(partSize)
+ .withGeneralProgressListener(progressListener);
+ log.debug("Uploading part {} for id '{}'", currentPartNumber, uploadId);
+ partETags.add(s3.uploadPart(request).getPartETag());
+ }
+
+ public void complete() {
+ log.debug("Completing multi-part upload for key '{}', id '{}'", key, uploadId);
+ CompleteMultipartUploadRequest completeRequest =
+ new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
+ s3.completeMultipartUpload(completeRequest);
+ }
+
+ public void abort() {
+ log.warn("Aborting multi-part upload with id '{}'", uploadId);
+ try {
+ s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId));
+ } catch (Exception e) {
+ // ignoring failure on abort.
+ log.warn("Unable to abort multipart upload, you may need to purge uploaded parts: ", e);
+ }
+ }
+ }
+
+ // Dummy listener for now, just logs the event progress.
+ private static class ConnectProgressListener implements ProgressListener {
+ @Override
+ public void progressChanged(ProgressEvent progressEvent) {
+ log.debug("Progress event: " + progressEvent);
+ }
+ }
+}
\ No newline at end of file
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java
new file mode 100644
index 0000000..4ab62c9
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.cache.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
+import org.apache.nifi.minifi.c2.api.cache.WriteableConfiguration;
+
+public class S3WritableConfiguration implements WriteableConfiguration {
+
+ private AmazonS3 s3;
+ private final S3Object s3Object;
+ private final String version;
+
+ /**
+ * Creates a new S3 writable configuration.
+ * @param s3 An S3 {@link AmazonS3 client}.
+ * @param s3ObjectSummary The S3 object {@link S3ObjectSummary summary}.
+ * @param version The version of the configuration.
+ */
+ public S3WritableConfiguration(AmazonS3 s3, S3ObjectSummary s3ObjectSummary, String version) {
+
+ this.s3 = s3;
+ this.s3Object = s3.getObject(s3ObjectSummary.getBucketName(), s3ObjectSummary.getKey());
+ this.version = version;
+
+ }
+
+ /**
+ * Creates a new S3 writable configuration.
+ * @param s3 An S3 {@link AmazonS3 client}.
+ * @param s3Object The S3 {@link S3Object object}.
+ * @param version The version of the configuration.
+ */
+ public S3WritableConfiguration(AmazonS3 s3, S3Object s3Object, String version) {
+
+ this.s3 = s3;
+ this.s3Object = s3Object;
+ this.version = version;
+
+ }
+
+ @Override
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public boolean exists() {
+ return s3.doesObjectExist(s3Object.getBucketName(), s3Object.getKey());
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws ConfigurationProviderException {
+ return new S3OutputStream(s3Object.getBucketName(), s3Object.getKey(), s3);
+ }
+
+ @Override
+ public InputStream getInputStream() throws ConfigurationProviderException {
+ return s3Object.getObjectContent();
+ }
+
+ @Override
+ public String getName() {
+ return s3Object.getKey();
+ }
+
+ @Override
+ public String toString() {
+ return "FileSystemWritableConfiguration{objectKey=" + s3Object.getKey()
+ + ", version='" + version + "'}";
+ }
+
+}
diff --git a/minifi-c2/minifi-c2-cache/pom.xml b/minifi-c2/minifi-c2-cache/pom.xml
index adb7674..c0b0b06 100644
--- a/minifi-c2/minifi-c2-cache/pom.xml
+++ b/minifi-c2/minifi-c2-cache/pom.xml
@@ -27,5 +27,6 @@
<modules>
<module>minifi-c2-cache-filesystem</module>
+ <module>minifi-c2-cache-s3</module>
</modules>
</project>
diff --git a/pom.xml b/pom.xml
index a07f995..ccb399f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,6 +106,7 @@
<spring.version>4.2.4.RELEASE</spring.version>
<spring.security.version>4.0.3.RELEASE</spring.security.version>
<system.rules.version>1.16.1</system.rules.version>
+ <aws.sdk.version>1.11.172</aws.sdk.version>
</properties>
<dependencies>