MINIFI-366: Adds S3ConfigurationCache.

This closes #90.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
diff --git a/minifi-c2/minifi-c2-assembly/NOTICE b/minifi-c2/minifi-c2-assembly/NOTICE
index ebdfa69..2cf824f 100644
--- a/minifi-c2/minifi-c2-assembly/NOTICE
+++ b/minifi-c2/minifi-c2-assembly/NOTICE
@@ -8,6 +8,9 @@
 Apache Software License v2
 ===========================================
 
+This product includes the following work from the Apache Kafka project:
+S3OutputStream.java
+
 The following binary components are provided under the Apache Software License v2
 
   (ASLv2) Apache NiFi
@@ -67,6 +70,22 @@
       Original source copyright:
       Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
 
+  (ASLv2) AWS Java SDK
+    The following NOTICE information applies:
+      Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+      This product includes software developed by
+      Amazon Technologies, Inc (http://www.amazon.com/).
+
+      **********************
+      THIRD PARTY COMPONENTS
+      **********************
+      This software includes third party software subject to the following copyrights:
+      - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
+      - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+      The licenses for these third party components are included in LICENSE.txt
+
    (ASLv2) JsonPath
      The following NOTICE information applies:
        Copyright 2011 JsonPath authors
diff --git a/minifi-c2/minifi-c2-assembly/pom.xml b/minifi-c2/minifi-c2-assembly/pom.xml
index 32dca5a..aaede99 100644
--- a/minifi-c2/minifi-c2-assembly/pom.xml
+++ b/minifi-c2/minifi-c2-assembly/pom.xml
@@ -84,6 +84,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi.minifi</groupId>
+            <artifactId>minifi-c2-cache-s3</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.minifi</groupId>
             <artifactId>minifi-c2-provider-cache</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2-context.xml b/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2-context.xml
index bc23c97..22cc377 100644
--- a/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2-context.xml
+++ b/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2-context.xml
@@ -64,6 +64,35 @@
                         <value>\${class}.v\${version}</value>
                     </constructor-arg>
                 </bean>-->
+                <!--<bean class="org.apache.nifi.minifi.c2.provider.cache.CacheConfigurationProvider">
+                    <constructor-arg>
+                        <list>
+                            <value>text/yml</value>
+                        </list>
+                    </constructor-arg>
+                    <constructor-arg>
+                        <bean class="org.apache.nifi.minifi.c2.cache.s3.S3ConfigurationCache">
+                            <constructor-arg>
+                                <value>bucket</value>
+                            </constructor-arg>
+                            <constructor-arg>
+                                <value>prefix/</value>
+                            </constructor-arg>
+                            <constructor-arg>
+                        		<value>\${class}</value>
+                    		</constructor-arg>
+                    		<constructor-arg>
+                        		<value>access-key</value>
+                    		</constructor-arg>
+                    		<constructor-arg>
+                        		<value>secret-key</value>
+                    		</constructor-arg>
+                    		<constructor-arg>
+                        		<value>aws-region</value>
+                    		</constructor-arg>
+                        </bean>
+                    </constructor-arg>
+                </bean> -->
             </list>
         </constructor-arg>
         <constructor-arg>
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/pom.xml b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/pom.xml
new file mode 100644
index 0000000..dbe2d59
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>minifi-c2-cache</artifactId>
+        <groupId>org.apache.nifi.minifi</groupId>
+        <version>0.2.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>minifi-c2-cache-s3</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi.minifi</groupId>
+            <artifactId>minifi-c2-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+             <groupId>com.amazonaws</groupId>
+             <artifactId>aws-java-sdk-s3</artifactId>
+             <version>${aws.sdk.version}</version>
+         </dependency>
+         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>         
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3CacheFileInfoImpl.java b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3CacheFileInfoImpl.java
new file mode 100644
index 0000000..f93d1a8
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3CacheFileInfoImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.cache.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.iterable.S3Objects;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
+import org.apache.nifi.minifi.c2.api.cache.ConfigurationCacheFileInfo;
+import org.apache.nifi.minifi.c2.api.cache.WriteableConfiguration;
+import org.apache.nifi.minifi.c2.api.util.Pair;
+
+public class S3CacheFileInfoImpl implements ConfigurationCacheFileInfo {
+
+  private final AmazonS3 s3;
+  private final String bucket;
+  private final String prefix;
+  private final String expectedFilename;
+
+  /**
+   * Creates a new S3 cache file info.
+   * @param s3 The {@link AmazonS3 client}.
+   * @param bucket The S3 bucket.
+   * @param prefix The S3 object prefix.
+   */
+  public S3CacheFileInfoImpl(AmazonS3 s3, String bucket, String prefix,
+    String expectedFilename) {
+
+    this.s3 = s3;
+    this.bucket = bucket;
+    this.prefix = prefix;
+    this.expectedFilename = expectedFilename;
+
+  }
+
+  @Override
+  public Integer getVersionIfMatch(String objectKey) {
+
+    String filename = objectKey.substring(prefix.length());
+
+    int expectedFilenameLength = expectedFilename.length();
+    if (!filename.startsWith(expectedFilename) || filename.length() == expectedFilenameLength) {
+      return null;
+    }
+    try {
+      return Integer.parseInt(filename.substring(expectedFilenameLength));
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public WriteableConfiguration getConfiguration(Integer version)
+      throws ConfigurationProviderException {
+
+    if (version == null) {
+
+      try {
+        return getCachedConfigurations().findFirst()
+          .orElseThrow(() -> new ConfigurationProviderException("No configurations found."));
+      } catch (IOException e) {
+        throw new ConfigurationProviderException("Unable to get cached configurations.", e);
+      }
+
+    } else {
+
+      final S3Object s3Object;
+
+      if (StringUtils.isEmpty(prefix) || StringUtils.equals(prefix, "/")) {
+        s3Object = s3.getObject(new GetObjectRequest(bucket,
+          expectedFilename + version.toString()));
+      } else {
+        s3Object = s3.getObject(new GetObjectRequest(bucket,
+          prefix + expectedFilename + version.toString()));
+      }
+
+      if (s3Object == null) {
+        throw new ConfigurationProviderException("No configurations found for object key.");
+      }
+
+      return new S3WritableConfiguration(s3, s3Object, Integer.toString(version));
+
+    }
+
+  }
+
+  @Override
+  public Stream<WriteableConfiguration> getCachedConfigurations() throws IOException {
+
+    Iterable<S3ObjectSummary> objectSummaries = S3Objects.withPrefix(s3, bucket, prefix);
+    Stream<S3ObjectSummary> objectStream = StreamSupport.stream(objectSummaries.spliterator(), false);
+
+    return objectStream.map(p -> {
+      Integer version = getVersionIfMatch(p.getKey());
+      if (version == null) {
+        return null;
+      }
+      return new Pair<>(version, p);
+    }).filter(Objects::nonNull)
+        .sorted(Comparator.comparing(pair -> ((Pair<Integer, S3ObjectSummary>) pair).getFirst())
+              .reversed()).map(pair -> new S3WritableConfiguration(s3, pair.getSecond(), Integer.toString(pair.getFirst())));
+
+  }
+
+}
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3ConfigurationCache.java b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3ConfigurationCache.java
new file mode 100644
index 0000000..01a1380
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3ConfigurationCache.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.cache.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.c2.api.InvalidParameterException;
+import org.apache.nifi.minifi.c2.api.cache.ConfigurationCache;
+import org.apache.nifi.minifi.c2.api.cache.ConfigurationCacheFileInfo;
+
+public class S3ConfigurationCache implements ConfigurationCache {
+
+  private final AmazonS3 s3;
+  private final String bucket;
+  private final String prefix;
+  private final String pathPattern;
+
+  /**
+   * Creates a new S3 configuration cache.
+   * @param bucket The S3 bucket.
+   * @param prefix The S3 object prefix.
+   * @param pathPattern The path pattern.
+   * @param accessKey The (optional) S3 access key.
+   * @param secretKey The (optional) S3 secret key.
+   * @param region The AWS region (e.g. us-east-1).
+   * @throws IOException Thrown if the configuration cannot be read.
+   */
+  public S3ConfigurationCache(String bucket, String prefix, String pathPattern,
+      String accessKey, String secretKey, String region) throws IOException {
+
+    this.bucket = bucket;
+    this.prefix = prefix;
+    this.pathPattern = pathPattern;
+
+    if (!StringUtils.isEmpty(accessKey)) {
+
+      s3 = AmazonS3Client.builder()
+        .withCredentials(new AWSStaticCredentialsProvider(
+           new BasicAWSCredentials(accessKey, secretKey)))
+          .withRegion(Regions.fromName(region))
+          .build();
+
+    } else {
+
+      s3 = AmazonS3Client.builder()
+          .withRegion(Regions.fromName(region))
+          .build();
+    }
+
+  }
+
+  @Override
+  public ConfigurationCacheFileInfo getCacheFileInfo(String contentType,
+      Map<String, List<String>> parameters) throws InvalidParameterException {
+
+    String pathString = pathPattern;
+    for (Map.Entry<String, List<String>> entry : parameters.entrySet()) {
+      if (entry.getValue().size() != 1) {
+        throw new InvalidParameterException("Multiple values for same parameter"
+          + " are not supported by this provider.");
+      }
+      pathString = pathString.replaceAll(Pattern.quote("${" + entry.getKey() + "}"),
+        entry.getValue().get(0));
+    }
+    pathString = pathString + "." + contentType.replace('/', '.');
+    String[] split = pathString.split("/");
+    for (String s1 : split) {
+      int openBrace = s1.indexOf("${");
+      if (openBrace >= 0 && openBrace < s1.length() + 2) {
+        int closeBrace = s1.indexOf("}", openBrace + 2);
+        if (closeBrace >= 0) {
+          throw new InvalidParameterException("Found unsubstituted variable "
+            + s1.substring(openBrace + 2, closeBrace));
+        }
+      }
+    }
+
+    return new S3CacheFileInfoImpl(s3, bucket, prefix, pathString + ".v");
+  }
+
+}
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3OutputStream.java b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3OutputStream.java
new file mode 100644
index 0000000..abd4e46
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3OutputStream.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.cache.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This implementation has heavily borrowed Kafka's implementation
+ * which borrowed the general structure of Hadoop's implementation.
+ */
+public class S3OutputStream extends OutputStream {
+
+  private static final Logger log = LoggerFactory.getLogger(S3OutputStream.class);
+
+  private final AmazonS3 s3;
+  private final String bucket;
+  private final String key;
+  private final ProgressListener progressListener;
+  private final int partSize;
+  private boolean closed;
+  private ByteBuffer buffer;
+  private MultipartUpload multiPartUpload;
+
+  /**
+   * Creates a new S3 output stream for an object.
+   * @param bucket The bucket.
+   * @param key The object's key.
+   * @param s3 An S3 {@link AmazonS3 client}.
+   */
+  public S3OutputStream(String bucket, String key, AmazonS3 s3) {
+    this.s3 = s3;
+    this.bucket = bucket;
+    this.key = key;
+    this.partSize = 1024;
+    this.closed = false;
+    this.buffer = ByteBuffer.allocate(this.partSize);
+    this.progressListener = new ConnectProgressListener();
+    this.multiPartUpload = null;
+    log.debug("Create S3OutputStream for bucket '{}' key '{}'", bucket, key);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    buffer.put((byte) b);
+    if (!buffer.hasRemaining()) {
+      uploadPart();
+    }
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    } else if (off < 0 || off > b.length || len < 0 || (off + len) > b.length || (off + len) < 0) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+
+    if (buffer.remaining() < len) {
+      int firstPart = buffer.remaining();
+      buffer.put(b, off, firstPart);
+      uploadPart();
+      write(b, off + firstPart, len - firstPart);
+    } else {
+      buffer.put(b, off, len);
+    }
+  }
+
+  private void uploadPart() throws IOException {
+    uploadPart(partSize);
+    buffer.clear();
+  }
+
+  private void uploadPart(int size) throws IOException {
+    if (multiPartUpload == null) {
+      log.debug("New multi-part upload for bucket '{}' key '{}'", bucket, key);
+      multiPartUpload = newMultipartUpload();
+    }
+
+    try {
+      multiPartUpload.uploadPart(new ByteArrayInputStream(buffer.array()), size);
+    } catch (Exception e) {
+      if (multiPartUpload != null) {
+        multiPartUpload.abort();
+        log.debug("Multipart upload aborted for bucket '{}' key '{}'.", bucket, key);
+      }
+      throw new IOException("Part upload failed: ", e.getCause());
+    }
+  }
+
+  public void commit() throws IOException {
+    if (closed) {
+      log.warn("Tried to commit data for bucket '{}' key '{}' on a closed stream. Ignoring.", bucket, key);
+      return;
+    }
+
+    try {
+      if (buffer.hasRemaining()) {
+        uploadPart(buffer.position());
+      }
+      multiPartUpload.complete();
+      log.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
+    } catch (Exception e) {
+      log.error("Multipart upload failed to complete for bucket '{}' key '{}'", bucket, key);
+      throw new RuntimeException("Multipart upload failed to complete.", e);
+    } finally {
+      buffer.clear();
+      multiPartUpload = null;
+      close();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    if (multiPartUpload != null) {
+      multiPartUpload.abort();
+      log.debug("Multipart upload aborted for bucket '{}' key '{}'.", bucket, key);
+    }
+    super.close();
+  }
+
+  private MultipartUpload newMultipartUpload() throws IOException {
+    InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, key, new ObjectMetadata());
+    try {
+      return new MultipartUpload(s3.initiateMultipartUpload(initRequest).getUploadId());
+    } catch (AmazonClientException e) {
+      throw new IOException("Unable to initiate MultipartUpload: " + e, e);
+    }
+  }
+
+  private class MultipartUpload {
+    private final String uploadId;
+    private final List<PartETag> partETags;
+
+    public MultipartUpload(String uploadId) {
+      this.uploadId = uploadId;
+      this.partETags = new ArrayList<>();
+      log.debug("Initiated multi-part upload for bucket '{}' key '{}' with id '{}'", bucket, key, uploadId);
+    }
+
+    public void uploadPart(ByteArrayInputStream inputStream, int partSize) {
+      int currentPartNumber = partETags.size() + 1;
+      UploadPartRequest request = new UploadPartRequest()
+                                            .withBucketName(bucket)
+                                            .withKey(key)
+                                            .withUploadId(uploadId)
+                                            .withInputStream(inputStream)
+                                            .withPartNumber(currentPartNumber)
+                                            .withPartSize(partSize)
+                                            .withGeneralProgressListener(progressListener);
+      log.debug("Uploading part {} for id '{}'", currentPartNumber, uploadId);
+      partETags.add(s3.uploadPart(request).getPartETag());
+    }
+
+    public void complete() {
+      log.debug("Completing multi-part upload for key '{}', id '{}'", key, uploadId);
+      CompleteMultipartUploadRequest completeRequest =
+          new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
+      s3.completeMultipartUpload(completeRequest);
+    }
+
+    public void abort() {
+      log.warn("Aborting multi-part upload with id '{}'", uploadId);
+      try {
+        s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId));
+      } catch (Exception e) {
+        // ignoring failure on abort.
+        log.warn("Unable to abort multipart upload, you may need to purge uploaded parts: ", e);
+      }
+    }
+  }
+
+  // Dummy listener for now, just logs the event progress.
+  private static class ConnectProgressListener implements ProgressListener {
+    @Override
+  public void progressChanged(ProgressEvent progressEvent) {
+      log.debug("Progress event: " + progressEvent);
+    }
+  }
+}
\ No newline at end of file
diff --git a/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java
new file mode 100644
index 0000000..4ab62c9
--- /dev/null
+++ b/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.cache.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
+import org.apache.nifi.minifi.c2.api.cache.WriteableConfiguration;
+
+public class S3WritableConfiguration implements WriteableConfiguration {
+
+  private AmazonS3 s3;
+  private final S3Object s3Object;
+  private final String version;
+
+  /**
+   * Creates a new S3 writable configuration.
+   * @param s3 An S3 {@link AmazonS3 client}.
+   * @param s3ObjectSummary The S3 object {@link S3ObjectSummary summary}.
+   * @param version The version of the configuration.
+   */
+  public S3WritableConfiguration(AmazonS3 s3, S3ObjectSummary s3ObjectSummary, String version) {
+
+    this.s3 = s3;
+    this.s3Object = s3.getObject(s3ObjectSummary.getBucketName(), s3ObjectSummary.getKey());
+    this.version = version;
+
+  }
+
+  /**
+   * Creates a new S3 writable configuration.
+   * @param s3 An S3 {@link AmazonS3 client}.
+   * @param s3Object The S3 {@link S3Object object}.
+   * @param version The version of the configuration.
+   */
+  public S3WritableConfiguration(AmazonS3 s3, S3Object s3Object, String version) {
+
+    this.s3 = s3;
+    this.s3Object = s3Object;
+    this.version = version;
+
+  }
+
+  @Override
+  public String getVersion() {
+    return version;
+  }
+
+  @Override
+  public boolean exists() {
+    return s3.doesObjectExist(s3Object.getBucketName(), s3Object.getKey());
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws ConfigurationProviderException {
+    return new S3OutputStream(s3Object.getBucketName(), s3Object.getKey(), s3);
+  }
+
+  @Override
+  public InputStream getInputStream() throws ConfigurationProviderException {
+    return s3Object.getObjectContent();
+  }
+
+  @Override
+  public String getName() {
+    return s3Object.getKey();
+  }
+
+  @Override
+  public String toString() {
+    return "FileSystemWritableConfiguration{objectKey=" + s3Object.getKey()
+      + ", version='" + version + "'}";
+  }
+
+}
diff --git a/minifi-c2/minifi-c2-cache/pom.xml b/minifi-c2/minifi-c2-cache/pom.xml
index adb7674..c0b0b06 100644
--- a/minifi-c2/minifi-c2-cache/pom.xml
+++ b/minifi-c2/minifi-c2-cache/pom.xml
@@ -27,5 +27,6 @@
 
     <modules>
         <module>minifi-c2-cache-filesystem</module>
+	<module>minifi-c2-cache-s3</module>
     </modules>
 </project>
diff --git a/pom.xml b/pom.xml
index a07f995..ccb399f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,6 +106,7 @@
         <spring.version>4.2.4.RELEASE</spring.version>
         <spring.security.version>4.0.3.RELEASE</spring.security.version>
         <system.rules.version>1.16.1</system.rules.version>
+        <aws.sdk.version>1.11.172</aws.sdk.version>
     </properties>
 
     <dependencies>