NIFIREG-216 - S3BundlePersistenceProvider
- Setup AWS extensions and add S3BundlePersistenceProvider
- Refactor BundlePersistenceProvider API and updated implementations accordingly
- Include AWS extensions by default
diff --git a/NOTICE b/NOTICE
index 8409aae..60e3cf8 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,13 +1,13 @@
Apache NiFi Registry
-Copyright 2014-2018 The Apache Software Foundation
+Copyright 2014-2019 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
This includes derived works from the Apache NiFi (ASLv2 licensed) project (https://git-wip-us.apache.org/repos/asf?p=nifi.git):
- Copyright 2015-2018 The Apache Software Foundation
+ Copyright 2015-2019 The Apache Software Foundation
This includes sources for bootstrapping, runtime, component API, security/authorization API
This includes derived works from the Apache NiFi MiNiFi (ASLv2 licensed) project (https://git-wip-us.apache.org/repos/asf/nifi-minifi.git):
- Copyright 2015-2018 The Apache Software Foundation
+ Copyright 2015-2019 The Apache Software Foundation
This includes build configuration for the Swagger UI.
diff --git a/nifi-registry-assembly/pom.xml b/nifi-registry-assembly/pom.xml
index 34892b3..e0d3b57 100644
--- a/nifi-registry-assembly/pom.xml
+++ b/nifi-registry-assembly/pom.xml
@@ -165,6 +165,7 @@
<!-- nifi-registry.properties: extension properties -->
<nifi.registry.extensions.working.directory>./work/extensions</nifi.registry.extensions.working.directory>
+ <nifi.registry.extension.dir.aws></nifi.registry.extension.dir.aws>
<!-- nifi-registry.properties: legacy database properties, used to migrate data from old DB to the new DB below -->
<nifi.registry.db.directory />
@@ -423,5 +424,48 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>include-aws</id>
+ <activation>
+ <property>
+ <name>!skipAws</name>
+ </property>
+ </activation>
+ <properties>
+ <nifi.registry.extension.dir.aws>./ext/aws/lib</nifi.registry.extension.dir.aws>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-aws-assembly</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ <classifier>bin</classifier>
+ <scope>runtime</scope>
+ <type>zip</type>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>unpack-aws-extensions</id>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <phase>generate-resources</phase>
+ <configuration>
+ <outputDirectory>${project.build.directory}/ext/aws</outputDirectory>
+ <includeGroupIds>org.apache.nifi.registry</includeGroupIds>
+ <includeArtifactIds>nifi-registry-aws-assembly</includeArtifactIds>
+ <excludeTransitive>false</excludeTransitive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
\ No newline at end of file
diff --git a/nifi-registry-assembly/src/main/assembly/dependencies.xml b/nifi-registry-assembly/src/main/assembly/dependencies.xml
index c41cc26..d7985ac 100644
--- a/nifi-registry-assembly/src/main/assembly/dependencies.xml
+++ b/nifi-registry-assembly/src/main/assembly/dependencies.xml
@@ -67,6 +67,7 @@
<exclude>nifi-registry-utils</exclude>
<exclude>nifi-registry-docs</exclude>
<exclude>nifi-registry-ranger-assembly</exclude>
+ <exclude>nifi-registry-aws-assembly</exclude>
</excludes>
</dependencySet>
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
index f031224..acd7705 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
@@ -25,6 +25,7 @@
import org.apache.nifi.registry.provider.generated.Providers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -51,7 +52,7 @@
* Standard implementation of ProviderFactory.
*/
@Configuration
-public class StandardProviderFactory implements ProviderFactory {
+public class StandardProviderFactory implements ProviderFactory, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(StandardProviderFactory.class);
@@ -239,12 +240,29 @@
final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbExtensionBundleProvider.getProperty());
bundlePersistenceProvider.onConfigured(configurationContext);
- LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {extensionBundleProviderClassName});
+ LOGGER.info("Configured BundlePersistenceProvider with class name {}", new Object[] {extensionBundleProviderClassName});
}
return bundlePersistenceProvider;
}
+ @Override
+ public void destroy() throws Exception {
+ final List<Provider> providers = new ArrayList<>(eventHookProviders);
+ providers.add(flowPersistenceProvider);
+ providers.add(bundlePersistenceProvider);
+
+ for (final Provider provider : providers) {
+ if (provider != null) {
+ try {
+ provider.preDestruction();
+ } catch (Throwable t) {
+ LOGGER.error(t.getMessage(), t);
+ }
+ }
+ }
+ }
+
private ProviderConfigurationContext createConfigurationContext(final List<Property> configProperties) {
final Map<String,String> properties = new HashMap<>();
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/FileSystemBundlePersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/FileSystemBundlePersistenceProvider.java
index fae453e..555260a 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/FileSystemBundlePersistenceProvider.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/FileSystemBundlePersistenceProvider.java
@@ -18,9 +18,12 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.registry.extension.BundleContext;
+import org.apache.nifi.registry.extension.BundleCoordinate;
+import org.apache.nifi.registry.extension.BundlePersistenceContext;
import org.apache.nifi.registry.extension.BundlePersistenceException;
import org.apache.nifi.registry.extension.BundlePersistenceProvider;
+import org.apache.nifi.registry.extension.BundleVersionCoordinate;
+import org.apache.nifi.registry.extension.BundleVersionType;
import org.apache.nifi.registry.flow.FlowPersistenceException;
import org.apache.nifi.registry.provider.ProviderConfigurationContext;
import org.apache.nifi.registry.provider.ProviderCreationException;
@@ -76,11 +79,20 @@
}
@Override
- public synchronized void saveBundleVersion(final BundleContext context, final InputStream contentStream, boolean overwrite)
+ public synchronized void createBundleVersion(final BundlePersistenceContext context, final InputStream contentStream)
throws BundlePersistenceException {
+ saveOrUpdateBundleVersion(context, contentStream, false);
+ }
- final File bundleVersionDir = getBundleVersionDirectory(bundleStorageDir, context.getBucketName(),
- context.getBundleGroupId(), context.getBundleArtifactId(), context.getBundleVersion());
+ @Override
+ public synchronized void updateBundleVersion(final BundlePersistenceContext context, final InputStream contentStream) throws BundlePersistenceException {
+ saveOrUpdateBundleVersion(context, contentStream, true);
+ }
+
+ private synchronized void saveOrUpdateBundleVersion(final BundlePersistenceContext context, final InputStream contentStream,
+ final boolean overwrite) throws BundlePersistenceException {
+ final BundleVersionCoordinate versionCoordinate = context.getCoordinate();
+ final File bundleVersionDir = getBundleVersionDirectory(bundleStorageDir, versionCoordinate);
try {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(bundleVersionDir);
} catch (IOException e) {
@@ -88,12 +100,10 @@
+ bundleVersionDir.getAbsolutePath(), e);
}
- final File bundleFile = getBundleFile(bundleVersionDir, context.getBundleArtifactId(),
- context.getBundleVersion(), context.getBundleType());
-
+ final File bundleFile = getBundleFile(bundleVersionDir, versionCoordinate);
if (bundleFile.exists() && !overwrite) {
- throw new BundlePersistenceException("Unable to save because an extension bundle already exists at "
- + bundleFile.getAbsolutePath());
+ final String existingPath = bundleFile.getAbsolutePath();
+ throw new BundlePersistenceException("Unable to save because a bundle versions already exists at " + existingPath);
}
if (LOGGER.isDebugEnabled()) {
@@ -109,15 +119,10 @@
}
@Override
- public synchronized void getBundleVersion(final BundleContext context, final OutputStream outputStream)
+ public synchronized void getBundleVersionContent(final BundleVersionCoordinate versionCoordinate, final OutputStream outputStream)
throws BundlePersistenceException {
- final File bundleVersionDir = getBundleVersionDirectory(bundleStorageDir, context.getBucketName(),
- context.getBundleGroupId(), context.getBundleArtifactId(), context.getBundleVersion());
-
- final File bundleFile = getBundleFile(bundleVersionDir, context.getBundleArtifactId(),
- context.getBundleVersion(), context.getBundleType());
-
+ final File bundleFile = getBundleFile(versionCoordinate);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Reading extension bundle from {}", new Object[]{bundleFile.getAbsolutePath()});
}
@@ -134,13 +139,8 @@
}
@Override
- public synchronized void deleteBundleVersion(final BundleContext context) throws BundlePersistenceException {
- final File bundleVersionDir = getBundleVersionDirectory(bundleStorageDir, context.getBucketName(),
- context.getBundleGroupId(), context.getBundleArtifactId(), context.getBundleVersion());
-
- final File bundleFile = getBundleFile(bundleVersionDir, context.getBundleArtifactId(),
- context.getBundleVersion(), context.getBundleType());
-
+ public synchronized void deleteBundleVersion(final BundleVersionCoordinate versionCoordinate) throws BundlePersistenceException {
+ final File bundleFile = getBundleFile(versionCoordinate);
if (!bundleFile.exists()) {
LOGGER.warn("Extension bundle content does not exist at {}", new Object[] {bundleFile.getAbsolutePath()});
return;
@@ -157,10 +157,8 @@
}
@Override
- public synchronized void deleteAllBundleVersions(final String bucketId, final String bucketName, final String groupId, final String artifactId)
- throws BundlePersistenceException {
-
- final File bundleDir = getBundleDirectory(bundleStorageDir, bucketName, groupId, artifactId);
+ public synchronized void deleteAllBundleVersions(final BundleCoordinate bundleCoordinate) throws BundlePersistenceException {
+ final File bundleDir = getBundleDirectory(bundleStorageDir, bundleCoordinate);
if (!bundleDir.exists()) {
LOGGER.warn("Extension bundle directory does not exist at {}", new Object[] {bundleDir.getAbsolutePath()});
return;
@@ -199,15 +197,34 @@
}
}
- static File getBundleDirectory(final File bundleStorageDir, final String bucketName, final String groupId, final String artifactId) {
- return new File(bundleStorageDir, sanitize(bucketName) + "/" + sanitize(groupId) + "/" + sanitize(artifactId));
+ private File getBundleFile(final BundleVersionCoordinate coordinate) {
+ final File bundleVersionDir = getBundleVersionDirectory(bundleStorageDir, coordinate);
+ return getBundleFile(bundleVersionDir, coordinate);
}
- static File getBundleVersionDirectory(final File bundleStorageDir, final String bucketName, final String groupId, final String artifactId, final String version) {
- return new File(bundleStorageDir, sanitize(bucketName) + "/" + sanitize(groupId) + "/" + sanitize(artifactId) + "/" + sanitize(version));
+ static File getBundleDirectory(final File bundleStorageDir, final BundleCoordinate bundleCoordinate) {
+ final String bucketId = bundleCoordinate.getBucketId();
+ final String groupId = bundleCoordinate.getGroupId();
+ final String artifactId = bundleCoordinate.getArtifactId();
+
+ return new File(bundleStorageDir, sanitize(bucketId) + "/" + sanitize(groupId) + "/" + sanitize(artifactId));
}
- static File getBundleFile(final File parentDir, final String artifactId, final String version, final BundleContext.BundleType bundleType) {
+ static File getBundleVersionDirectory(final File bundleStorageDir, final BundleVersionCoordinate versionCoordinate) {
+ final String bucketId = versionCoordinate.getBucketId();
+ final String groupId = versionCoordinate.getGroupId();
+ final String artifactId = versionCoordinate.getArtifactId();
+ final String version = versionCoordinate.getVersion();
+
+ return new File(bundleStorageDir, sanitize(bucketId) + "/" + sanitize(groupId) + "/" + sanitize(artifactId) + "/" + sanitize(version));
+ }
+
+ static File getBundleFile(final File parentDir, final BundleVersionCoordinate versionCoordinate) {
+ final String artifactId = versionCoordinate.getArtifactId();
+ final String version = versionCoordinate.getVersion();
+ final BundleVersionType bundleType = versionCoordinate.getType();
+
+
final String bundleFileExtension = getBundleFileExtension(bundleType);
final String bundleFilename = sanitize(artifactId) + "-" + sanitize(version) + bundleFileExtension;
return new File(parentDir, bundleFilename);
@@ -217,14 +234,15 @@
return FileUtils.sanitizeFilename(input).trim().toLowerCase();
}
- static String getBundleFileExtension(final BundleContext.BundleType bundleType) {
+ static String getBundleFileExtension(final BundleVersionType bundleType) {
switch (bundleType) {
case NIFI_NAR:
return NAR_EXTENSION;
case MINIFI_CPP:
return CPP_EXTENSION;
default:
- throw new IllegalArgumentException("Unknown bundle type: " + bundleType);
+ LOGGER.warn("Unknown bundle type: " + bundleType);
+ return "";
}
}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundleContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundleContext.java
deleted file mode 100644
index 56122a0..0000000
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundleContext.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.registry.provider.extension;
-
-import org.apache.commons.lang3.Validate;
-import org.apache.nifi.registry.extension.BundleContext;
-
-public class StandardBundleContext implements BundleContext {
-
- private final BundleType bundleType;
- private final String bucketId;
- private final String bucketName;
- private final String bundleId;
- private final String bundleGroupId;
- private final String bundleArtifactId;
- private final String bundleVersion;
- private final String description;
- private final String author;
- private final long timestamp;
-
- private StandardBundleContext(final Builder builder) {
- this.bundleType = builder.bundleType;
- this.bucketId = builder.bucketId;
- this.bucketName = builder.bucketName;
- this.bundleId = builder.bundleId;
- this.bundleGroupId = builder.bundleGroupId;
- this.bundleArtifactId = builder.bundleArtifactId;
- this.bundleVersion = builder.bundleVersion;
- this.description = builder.description;
- this.author = builder.author;
- this.timestamp = builder.timestamp;
- Validate.notNull(this.bundleType);
- Validate.notBlank(this.bucketId);
- Validate.notBlank(this.bucketName);
- Validate.notBlank(this.bundleId);
- Validate.notBlank(this.bundleGroupId);
- Validate.notBlank(this.bundleArtifactId);
- Validate.notBlank(this.bundleVersion);
- Validate.notBlank(this.author);
- }
-
-
- @Override
- public BundleType getBundleType() {
- return bundleType;
- }
-
- @Override
- public String getBucketId() {
- return bucketId;
- }
-
- @Override
- public String getBucketName() {
- return bucketName;
- }
-
- @Override
- public String getBundleId() {
- return bundleId;
- }
-
- @Override
- public String getBundleGroupId() {
- return bundleGroupId;
- }
-
- @Override
- public String getBundleArtifactId() {
- return bundleArtifactId;
- }
-
- @Override
- public String getBundleVersion() {
- return bundleVersion;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
-
- @Override
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public String getAuthor() {
- return author;
- }
-
- public static class Builder {
-
- private BundleType bundleType;
- private String bucketId;
- private String bucketName;
- private String bundleId;
- private String bundleGroupId;
- private String bundleArtifactId;
- private String bundleVersion;
- private String description;
- private String author;
- private long timestamp;
-
- public Builder bundleType(final BundleType bundleType) {
- this.bundleType = bundleType;
- return this;
- }
-
- public Builder bucketId(final String bucketId) {
- this.bucketId = bucketId;
- return this;
- }
-
- public Builder bucketName(final String bucketName) {
- this.bucketName = bucketName;
- return this;
- }
-
- public Builder bundleId(final String bundleId) {
- this.bundleId = bundleId;
- return this;
- }
-
- public Builder bundleGroupId(final String bundleGroupId) {
- this.bundleGroupId = bundleGroupId;
- return this;
- }
-
- public Builder bundleArtifactId(final String bundleArtifactId) {
- this.bundleArtifactId = bundleArtifactId;
- return this;
- }
-
- public Builder bundleVersion(final String bundleVersion) {
- this.bundleVersion = bundleVersion;
- return this;
- }
-
- public Builder description(final String description) {
- this.description = description;
- return this;
- }
-
- public Builder author(final String author) {
- this.author = author;
- return this;
- }
-
- public Builder timestamp(final long timestamp) {
- this.timestamp = timestamp;
- return this;
- }
-
- public StandardBundleContext build() {
- return new StandardBundleContext(this);
- }
-
- }
-
-}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundleCoordinate.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundleCoordinate.java
new file mode 100644
index 0000000..c05cbdf
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundleCoordinate.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.extension;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.extension.BundleCoordinate;
+
+import java.util.Objects;
+
+public class StandardBundleCoordinate implements BundleCoordinate {
+
+ private final String bucketId;
+ private final String groupId;
+ private final String artifactId;
+
+ private StandardBundleCoordinate(final Builder builder) {
+ this.bucketId = builder.bucketId;
+ this.groupId = builder.groupId;
+ this.artifactId = builder.artifactId;
+ Validate.notBlank(this.bucketId, "Bucket Id is required");
+ Validate.notBlank(this.groupId, "Group Id is required");
+ Validate.notBlank(this.artifactId, "Artifact Id is required");
+ }
+
+ @Override
+ public String getBucketId() {
+ return bucketId;
+ }
+
+ @Override
+ public String getGroupId() {
+ return groupId;
+ }
+
+ @Override
+ public String getArtifactId() {
+ return artifactId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final StandardBundleCoordinate that = (StandardBundleCoordinate) o;
+ return bucketId.equals(that.bucketId)
+ && groupId.equals(that.groupId)
+ && artifactId.equals(that.artifactId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bucketId, groupId, artifactId);
+ }
+
+ public static class Builder {
+
+ private String bucketId;
+ private String groupId;
+ private String artifactId;
+
+ public Builder bucketId(final String bucketId) {
+ this.bucketId = bucketId;
+ return this;
+ }
+
+ public Builder groupId(final String groupId) {
+ this.groupId = groupId;
+ return this;
+ }
+
+ public Builder artifactId(final String artifactId) {
+ this.artifactId = artifactId;
+ return this;
+ }
+
+
+ public StandardBundleCoordinate build() {
+ return new StandardBundleCoordinate(this);
+ }
+
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundlePersistenceContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundlePersistenceContext.java
new file mode 100644
index 0000000..249be19
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundlePersistenceContext.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.extension;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.extension.BundlePersistenceContext;
+import org.apache.nifi.registry.extension.BundleVersionCoordinate;
+
+public class StandardBundlePersistenceContext implements BundlePersistenceContext {
+
+ private final BundleVersionCoordinate coordinate;
+ private final String author;
+ private final long timestamp;
+ private final long bundleSize;
+
+ private StandardBundlePersistenceContext(final Builder builder) {
+ this.coordinate = builder.coordinate;
+ this.bundleSize = builder.bundleSize;
+ this.author = builder.author;
+ this.timestamp = builder.timestamp;
+ Validate.notNull(this.coordinate);
+ Validate.notBlank(this.author);
+ }
+
+
+ @Override
+ public BundleVersionCoordinate getCoordinate() {
+ return coordinate;
+ }
+
+ @Override
+ public long getSize() {
+ return bundleSize;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String getAuthor() {
+ return author;
+ }
+
+ public static class Builder {
+
+ private BundleVersionCoordinate coordinate;
+ private String author;
+ private long timestamp;
+ private long bundleSize;
+
+ public Builder coordinate(final BundleVersionCoordinate identifier) {
+ this.coordinate = identifier;
+ return this;
+ }
+
+ public Builder author(final String author) {
+ this.author = author;
+ return this;
+ }
+
+ public Builder timestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public Builder bundleSize(final long size) {
+ this.bundleSize = size;
+ return this;
+ }
+
+ public StandardBundlePersistenceContext build() {
+ return new StandardBundlePersistenceContext(this);
+ }
+
+ }
+
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundleVersionCoordinate.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundleVersionCoordinate.java
new file mode 100644
index 0000000..c51ae1b
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/extension/StandardBundleVersionCoordinate.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.extension;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.extension.BundleVersionCoordinate;
+import org.apache.nifi.registry.extension.BundleVersionType;
+
+import java.util.Objects;
+
+public class StandardBundleVersionCoordinate implements BundleVersionCoordinate {
+
+ private final String bucketId;
+ private final String groupId;
+ private final String artifactId;
+ private final String version;
+ private final BundleVersionType type;
+
+ private StandardBundleVersionCoordinate(final Builder builder) {
+ this.bucketId = builder.bucketId;
+ this.groupId = builder.groupId;
+ this.artifactId = builder.artifactId;
+ this.version = builder.version;
+ this.type = builder.type;
+ Validate.notBlank(this.bucketId, "Bucket Id is required");
+ Validate.notBlank(this.groupId, "Group Id is required");
+ Validate.notBlank(this.artifactId, "Artifact Id is required");
+ Validate.notBlank(this.version, "Version is required");
+ Validate.notNull(this.type, "BundleVersionType is required");
+ }
+
+ @Override
+ public String getBucketId() {
+ return bucketId;
+ }
+
+ @Override
+ public String getGroupId() {
+ return groupId;
+ }
+
+ @Override
+ public String getArtifactId() {
+ return artifactId;
+ }
+
+ @Override
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public BundleVersionType getType() {
+ return type;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final StandardBundleVersionCoordinate that = (StandardBundleVersionCoordinate) o;
+ return bucketId.equals(that.bucketId)
+ && groupId.equals(that.groupId)
+ && artifactId.equals(that.artifactId)
+ && version.equals(that.version)
+ && type == that.type;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bucketId, groupId, artifactId, version, type);
+ }
+
+ @Override
+ public String toString() {
+ return "BundleVersionCoordinate [" +
+ "bucketId='" + bucketId + '\'' +
+ ", groupId='" + groupId + '\'' +
+ ", artifactId='" + artifactId + '\'' +
+ ", version='" + version + '\'' +
+ ", type=" + type +
+ ']';
+ }
+
+ public static class Builder {
+
+ private String bucketId;
+ private String groupId;
+ private String artifactId;
+ private String version;
+ private BundleVersionType type;
+
+ public Builder bucketId(final String bucketId) {
+ this.bucketId = bucketId;
+ return this;
+ }
+
+ public Builder groupId(final String groupId) {
+ this.groupId = groupId;
+ return this;
+ }
+
+ public Builder artifactId(final String artifactId) {
+ this.artifactId = artifactId;
+ return this;
+ }
+
+ public Builder version(final String version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder type(final BundleVersionType type) {
+ this.type = type;
+ return this;
+ }
+
+ public StandardBundleVersionCoordinate build() {
+ return new StandardBundleVersionCoordinate(this);
+ }
+
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
index bbf647b..606fca2 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
@@ -29,6 +29,8 @@
import org.apache.nifi.registry.diff.ComponentDifferenceGroup;
import org.apache.nifi.registry.diff.VersionedFlowDifference;
import org.apache.nifi.registry.exception.ResourceNotFoundException;
+import org.apache.nifi.registry.extension.BundleCoordinate;
+import org.apache.nifi.registry.extension.BundlePersistenceProvider;
import org.apache.nifi.registry.extension.bundle.Bundle;
import org.apache.nifi.registry.extension.bundle.BundleFilterParams;
import org.apache.nifi.registry.extension.bundle.BundleType;
@@ -58,6 +60,7 @@
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.provider.extension.StandardBundleCoordinate;
import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext;
import org.apache.nifi.registry.serialization.Serializer;
import org.apache.nifi.registry.service.alias.RegistryUrlAliasService;
@@ -107,6 +110,7 @@
private final MetadataService metadataService;
private final FlowPersistenceProvider flowPersistenceProvider;
+ private final BundlePersistenceProvider bundlePersistenceProvider;
private final Serializer<VersionedProcessGroup> processGroupSerializer;
private final ExtensionService extensionService;
private final Validator validator;
@@ -119,12 +123,14 @@
@Autowired
public RegistryService(final MetadataService metadataService,
final FlowPersistenceProvider flowPersistenceProvider,
+ final BundlePersistenceProvider bundlePersistenceProvider,
final Serializer<VersionedProcessGroup> processGroupSerializer,
final ExtensionService extensionService,
final Validator validator,
final RegistryUrlAliasService registryUrlAliasService) {
this.metadataService = Validate.notNull(metadataService);
this.flowPersistenceProvider = Validate.notNull(flowPersistenceProvider);
+ this.bundlePersistenceProvider = Validate.notNull(bundlePersistenceProvider);
this.processGroupSerializer = Validate.notNull(processGroupSerializer);
this.extensionService = Validate.notNull(extensionService);
this.validator = Validate.notNull(validator);
@@ -302,6 +308,16 @@
flowPersistenceProvider.deleteAllFlowContent(bucketIdentifier, flowEntity.getId());
}
+ // for each bundle in the bucket, delete all versions from the bundle persistence provider
+ for (final BundleEntity bundleEntity : metadataService.getBundlesByBucket(existingBucket.getId())) {
+ final BundleCoordinate bundleCoordinate = new StandardBundleCoordinate.Builder()
+ .bucketId(bundleEntity.getBucketId())
+ .groupId(bundleEntity.getGroupId())
+ .artifactId(bundleEntity.getArtifactId())
+ .build();
+ bundlePersistenceProvider.deleteAllBundleVersions(bundleCoordinate);
+ }
+
// now delete the bucket from the metadata provider, which deletes all flows referencing it
metadataService.deleteBucket(existingBucket);
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/extension/StandardExtensionService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/extension/StandardExtensionService.java
index c7cca08..0dddbfc 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/extension/StandardExtensionService.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/extension/StandardExtensionService.java
@@ -32,8 +32,11 @@
import org.apache.nifi.registry.db.entity.ExtensionAdditionalDetailsEntity;
import org.apache.nifi.registry.db.entity.ExtensionEntity;
import org.apache.nifi.registry.exception.ResourceNotFoundException;
-import org.apache.nifi.registry.extension.BundleContext;
+import org.apache.nifi.registry.extension.BundleCoordinate;
+import org.apache.nifi.registry.extension.BundlePersistenceContext;
import org.apache.nifi.registry.extension.BundlePersistenceProvider;
+import org.apache.nifi.registry.extension.BundleVersionCoordinate;
+import org.apache.nifi.registry.extension.BundleVersionType;
import org.apache.nifi.registry.extension.bundle.BuildInfo;
import org.apache.nifi.registry.extension.bundle.Bundle;
import org.apache.nifi.registry.extension.bundle.BundleFilterParams;
@@ -52,7 +55,9 @@
import org.apache.nifi.registry.extension.repo.ExtensionRepoGroup;
import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary;
import org.apache.nifi.registry.properties.NiFiRegistryProperties;
-import org.apache.nifi.registry.provider.extension.StandardBundleContext;
+import org.apache.nifi.registry.provider.extension.StandardBundleCoordinate;
+import org.apache.nifi.registry.provider.extension.StandardBundlePersistenceContext;
+import org.apache.nifi.registry.provider.extension.StandardBundleVersionCoordinate;
import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils;
import org.apache.nifi.registry.serialization.Serializer;
import org.apache.nifi.registry.service.MetadataService;
@@ -256,7 +261,7 @@
extensionEntities.forEach(e -> metadataService.createExtension(e));
// persist the content of the bundle to the persistence provider
- persistBundleVersionContent(bundleType, existingBucket, bundleEntity, versionEntity, extensionWorkingFile, overwriteBundleVersion);
+ persistBundleVersionContent(bundleType, bundleEntity, versionEntity, extensionWorkingFile, overwriteBundleVersion);
// get the updated extension bundle so it contains the correct version count
final BundleEntity updatedBundle = metadataService.getBundle(bucketIdentifier, groupId, artifactId);
@@ -378,35 +383,42 @@
return existingBundleEntity;
}
- private void persistBundleVersionContent(final BundleType bundleType, final BucketEntity bucket, final BundleEntity bundle,
- final BundleVersionEntity bundleVersion, final File extensionWorkingFile,
- final boolean overwriteBundleVersion) throws IOException {
- final BundleContext context = new StandardBundleContext.Builder()
- .bundleType(getProviderBundleType(bundleType))
- .bucketId(bucket.getId())
- .bucketName(bucket.getName())
- .bundleId(bundle.getId())
- .bundleGroupId(bundle.getGroupId())
- .bundleArtifactId(bundle.getArtifactId())
- .bundleVersion(bundleVersion.getVersion())
+ private void persistBundleVersionContent(final BundleType bundleType, final BundleEntity bundle, final BundleVersionEntity bundleVersion,
+ final File extensionWorkingFile, final boolean overwriteBundleVersion) throws IOException {
+
+ final BundleVersionCoordinate versionCoordinate = new StandardBundleVersionCoordinate.Builder()
+ .bucketId(bundle.getBucketId())
+ .groupId(bundle.getGroupId())
+ .artifactId(bundle.getArtifactId())
+ .version(bundleVersion.getVersion())
+ .type(getProviderBundleType(bundleType))
+ .build();
+
+ final BundlePersistenceContext context = new StandardBundlePersistenceContext.Builder()
+ .coordinate(versionCoordinate)
+ .bundleSize(bundleVersion.getContentSize())
.author(bundleVersion.getCreatedBy())
.timestamp(bundleVersion.getCreated().getTime())
.build();
try (final InputStream in = new FileInputStream(extensionWorkingFile);
final InputStream bufIn = new BufferedInputStream(in)) {
- bundlePersistenceProvider.saveBundleVersion(context, bufIn, overwriteBundleVersion);
- LOGGER.debug("Bundle saved to persistence provider - '{}' - '{}' - '{}'",
- new Object[]{context.getBundleGroupId(), context.getBundleArtifactId(), context.getBundleVersion()});
+ if (overwriteBundleVersion) {
+ bundlePersistenceProvider.updateBundleVersion(context, bufIn);
+ LOGGER.debug("Bundle version updated in persistence provider - {}", new Object[]{versionCoordinate.toString()});
+ } else {
+ bundlePersistenceProvider.createBundleVersion(context, bufIn);
+ LOGGER.debug("Bundle version created in persistence provider - {}", new Object[]{versionCoordinate.toString()});
+ }
}
}
- private BundleContext.BundleType getProviderBundleType(final BundleType bundleType) {
+ private BundleVersionType getProviderBundleType(final BundleType bundleType) {
switch (bundleType) {
case NIFI_NAR:
- return BundleContext.BundleType.NIFI_NAR;
+ return BundleVersionType.NIFI_NAR;
case MINIFI_CPP:
- return BundleContext.BundleType.MINIFI_CPP;
+ return BundleVersionType.MINIFI_CPP;
default:
throw new IllegalArgumentException("Unknown bundle type: " + bundleType.toString());
}
@@ -455,11 +467,13 @@
metadataService.deleteBundle(bundle.getIdentifier());
// delete all content associated with the bundle in the persistence provider
- bundlePersistenceProvider.deleteAllBundleVersions(
- bundle.getBucketIdentifier(),
- bundle.getBucketName(),
- bundle.getGroupId(),
- bundle.getArtifactId());
+ final BundleCoordinate bundleCoordinate = new StandardBundleCoordinate.Builder()
+ .bucketId(bundle.getBucketIdentifier())
+ .groupId(bundle.getGroupId())
+ .artifactId(bundle.getArtifactId())
+ .build();
+
+ bundlePersistenceProvider.deleteAllBundleVersions(bundleCoordinate);
return bundle;
}
@@ -603,8 +617,8 @@
@Override
public void writeBundleVersionContent(final BundleVersion bundleVersion, final OutputStream out) {
// get the content from the persistence provider and write it to the output stream
- final BundleContext context = getExtensionBundleContext(bundleVersion);
- bundlePersistenceProvider.getBundleVersion(context, out);
+ final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(bundleVersion);
+ bundlePersistenceProvider.getBundleVersionContent(versionCoordinate, out);
}
@Override
@@ -618,19 +632,8 @@
metadataService.deleteBundleVersion(extensionBundleVersionId);
// delete content associated with the bundle version in the persistence provider
- final BundleContext context = new StandardBundleContext.Builder()
- .bundleType(getProviderBundleType(bundleVersion.getBundle().getBundleType()))
- .bucketId(bundleVersion.getBucket().getIdentifier())
- .bucketName(bundleVersion.getBucket().getName())
- .bundleId(bundleVersion.getBundle().getIdentifier())
- .bundleGroupId(bundleVersion.getBundle().getGroupId())
- .bundleArtifactId(bundleVersion.getBundle().getArtifactId())
- .bundleVersion(bundleVersion.getVersionMetadata().getVersion())
- .author(bundleVersion.getVersionMetadata().getAuthor())
- .timestamp(bundleVersion.getVersionMetadata().getTimestamp())
- .build();
-
- bundlePersistenceProvider.deleteBundleVersion(context);
+ final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(bundleVersion);
+ bundlePersistenceProvider.deleteBundleVersion(versionCoordinate);
return bundleVersion;
}
@@ -921,23 +924,20 @@
// ------ Helper Methods -------
- private BundleContext getExtensionBundleContext(final BundleVersion bundleVersion) {
- return getExtensionBundleContext(bundleVersion.getBucket(), bundleVersion.getBundle(), bundleVersion.getVersionMetadata());
+ private BundleVersionCoordinate getVersionCoordinate(final BundleVersion bundleVersion) {
+ return getVersionCoordinate(bundleVersion.getBundle(), bundleVersion.getVersionMetadata());
}
- private BundleContext getExtensionBundleContext(final Bucket bucket, final Bundle bundle,
- final BundleVersionMetadata bundleVersionMetadata) {
- return new StandardBundleContext.Builder()
- .bundleType(getProviderBundleType(bundle.getBundleType()))
- .bucketId(bucket.getIdentifier())
- .bucketName(bucket.getName())
- .bundleId(bundle.getIdentifier())
- .bundleGroupId(bundle.getGroupId())
- .bundleArtifactId(bundle.getArtifactId())
- .bundleVersion(bundleVersionMetadata.getVersion())
- .author(bundleVersionMetadata.getAuthor())
- .timestamp(bundleVersionMetadata.getTimestamp())
+ private BundleVersionCoordinate getVersionCoordinate(final Bundle bundle, final BundleVersionMetadata bundleVersionMetadata) {
+ final BundleVersionCoordinate versionCoordinate = new StandardBundleVersionCoordinate.Builder()
+ .bucketId(bundle.getBucketIdentifier())
+ .groupId(bundle.getGroupId())
+ .artifactId(bundle.getArtifactId())
+ .version(bundleVersionMetadata.getVersion())
+ .type(getProviderBundleType(bundle.getBundleType()))
.build();
+
+ return versionCoordinate;
}
private BucketEntity getBucketEntity(final String bucketIdentifier) {
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockBundlePersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockBundlePersistenceProvider.java
index 1160841..ab1d0e4 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockBundlePersistenceProvider.java
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockBundlePersistenceProvider.java
@@ -16,9 +16,11 @@
*/
package org.apache.nifi.registry.provider;
-import org.apache.nifi.registry.extension.BundleContext;
+import org.apache.nifi.registry.extension.BundleCoordinate;
+import org.apache.nifi.registry.extension.BundlePersistenceContext;
import org.apache.nifi.registry.extension.BundlePersistenceException;
import org.apache.nifi.registry.extension.BundlePersistenceProvider;
+import org.apache.nifi.registry.extension.BundleVersionCoordinate;
import java.io.InputStream;
import java.io.OutputStream;
@@ -29,23 +31,27 @@
private Map<String,String> properties;
@Override
- public void saveBundleVersion(BundleContext context, InputStream contentStream, boolean overwrite)
- throws BundlePersistenceException {
+ public void createBundleVersion(BundlePersistenceContext context, InputStream contentStream) throws BundlePersistenceException {
}
@Override
- public void getBundleVersion(BundleContext context, OutputStream outputStream) throws BundlePersistenceException {
+ public void updateBundleVersion(BundlePersistenceContext context, InputStream contentStream) throws BundlePersistenceException {
}
@Override
- public void deleteBundleVersion(BundleContext context) throws BundlePersistenceException {
+ public void getBundleVersionContent(BundleVersionCoordinate versionCoordinate, OutputStream outputStream) throws BundlePersistenceException {
}
@Override
- public void deleteAllBundleVersions(String bucketId, String bucketName, String groupId, String artifactId) throws BundlePersistenceException {
+ public void deleteBundleVersion(BundleVersionCoordinate versionCoordinate) throws BundlePersistenceException {
+
+ }
+
+ @Override
+ public void deleteAllBundleVersions(BundleCoordinate bundleCoordinate) throws BundlePersistenceException {
}
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/extension/TestFileSystemBundlePersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/extension/TestFileSystemBundlePersistenceProvider.java
index 0e9c010..75c3cac 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/extension/TestFileSystemBundlePersistenceProvider.java
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/extension/TestFileSystemBundlePersistenceProvider.java
@@ -17,9 +17,12 @@
package org.apache.nifi.registry.provider.extension;
import org.apache.commons.io.IOUtils;
+import org.apache.nifi.registry.extension.BundleCoordinate;
+import org.apache.nifi.registry.extension.BundlePersistenceContext;
import org.apache.nifi.registry.extension.BundlePersistenceException;
import org.apache.nifi.registry.extension.BundlePersistenceProvider;
-import org.apache.nifi.registry.extension.BundleContext;
+import org.apache.nifi.registry.extension.BundleVersionCoordinate;
+import org.apache.nifi.registry.extension.BundleVersionType;
import org.apache.nifi.registry.provider.ProviderConfigurationContext;
import org.junit.Assert;
import org.junit.Before;
@@ -71,79 +74,96 @@
}
@Test
- public void testSaveSuccessfully() throws IOException {
+ public void testCreateSuccessfully() throws IOException {
+ final BundleVersionType type = BundleVersionType.NIFI_NAR;
+
// first version in b1
final String content1 = "g1-a1-1.0.0";
- createAndSaveBundleVersion(fileSystemBundleProvider, "b1", "g1", "a1", "1.0.0",
- BundleContext.BundleType.NIFI_NAR, content1);
- verifyBundleVersion(bundleStorageDir, "b1", "g1", "a1", "1.0.0",
- BundleContext.BundleType.NIFI_NAR, content1);
+ final BundleVersionCoordinate versionCoordinate1 = getVersionCoordinate("b1", "g1", "a1", "1.0.0", type);
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate1 , content1);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate1, content1);
// second version in b1
final String content2 = "g1-a1-1.1.0";
- createAndSaveBundleVersion(fileSystemBundleProvider, "b1", "g1", "a1", "1.1.0",
- BundleContext.BundleType.NIFI_NAR, content2);
- verifyBundleVersion(bundleStorageDir, "b1", "g1", "a1", "1.1.0",
- BundleContext.BundleType.NIFI_NAR, content2);
+ final BundleVersionCoordinate versionCoordinate2 = getVersionCoordinate("b1", "g1", "a1", "1.1.0", type);
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate2, content2);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate2, content2);
// same bundle but in b2
final String content3 = "g1-a1-1.1.0";
- createAndSaveBundleVersion(fileSystemBundleProvider, "b2", "g1", "a1", "1.1.0",
- BundleContext.BundleType.NIFI_NAR, content3);
- verifyBundleVersion(bundleStorageDir, "b2", "g1", "a1", "1.1.0",
- BundleContext.BundleType.NIFI_NAR, content2);
+ final BundleVersionCoordinate versionCoordinate3 = getVersionCoordinate("b2", "g1", "a1", "1.1.0", type);
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate3, content3);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate3, content2);
}
@Test
- public void testSaveWhenBundleVersionAlreadyExists() throws IOException {
+ public void testCreateWhenBundleVersionAlreadyExists() throws IOException {
+ final BundleVersionType type = BundleVersionType.NIFI_NAR;
+
final String content1 = "g1-a1-1.0.0";
- createAndSaveBundleVersion(fileSystemBundleProvider, "b1", "g1", "a1", "1.0.0",
- BundleContext.BundleType.NIFI_NAR, content1);
- verifyBundleVersion(bundleStorageDir, "b1", "g1", "a1", "1.0.0",
- BundleContext.BundleType.NIFI_NAR, content1);
+ final BundleVersionCoordinate versionCoordinate = getVersionCoordinate("b1", "g1", "a1", "1.0.0", type);
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate, content1);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
// try to save same bundle version that already exists
try {
final String newContent = "new content";
- createAndSaveBundleVersion(fileSystemBundleProvider, "b1", "g1", "a1", "1.0.0",
- BundleContext.BundleType.NIFI_NAR, newContent);
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate, newContent);
Assert.fail("Should have thrown exception");
} catch (BundlePersistenceException e) {
-
+ // expected
}
// verify existing content wasn't modified
- verifyBundleVersion(bundleStorageDir, "b1", "g1", "a1", "1.0.0",
- BundleContext.BundleType.NIFI_NAR, content1);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
}
@Test
- public void testSaveAndGet() throws IOException {
- final String bucketName = "b1";
+ public void testUpdateWhenBundleVersionAlreadyExists() throws IOException {
+ final BundleVersionType type = BundleVersionType.NIFI_NAR;
+
+ final String content1 = "g1-a1-1.0.0";
+ final BundleVersionCoordinate versionCoordinate = getVersionCoordinate("b1", "g1", "a1", "1.0.0", type);
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate, content1);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
+
+ // try to save same bundle version that already exists with new content
+ final String newContent = "new content";
+ updateBundleVersion(fileSystemBundleProvider, versionCoordinate, newContent);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate, newContent);
+
+ // retrieved content should be updated
+ try (final OutputStream out = new ByteArrayOutputStream()) {
+ fileSystemBundleProvider.getBundleVersionContent(versionCoordinate, out);
+ final String retrievedContent = new String(((ByteArrayOutputStream) out).toByteArray(), StandardCharsets.UTF_8);
+ Assert.assertEquals(newContent, retrievedContent);
+ }
+ }
+
+ @Test
+ public void testCreateAndGet() throws IOException {
+ final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
+ final BundleVersionType type = BundleVersionType.NIFI_NAR;
final String content1 = groupId + "-" + artifactId + "-" + "1.0.0";
- createAndSaveBundleVersion(fileSystemBundleProvider, bucketName, groupId, artifactId, "1.0.0",
- BundleContext.BundleType.NIFI_NAR, content1);
+ final BundleVersionCoordinate versionCoordinate1 = getVersionCoordinate(bucketId, groupId, artifactId, "1.0.0", type);
+ createBundleVersion(fileSystemBundleProvider,versionCoordinate1, content1);
final String content2 = groupId + "-" + artifactId + "-" + "1.1.0";
- createAndSaveBundleVersion(fileSystemBundleProvider, bucketName, groupId, artifactId, "1.1.0",
- BundleContext.BundleType.NIFI_NAR, content2);
+ final BundleVersionCoordinate versionCoordinate2 = getVersionCoordinate(bucketId, groupId, artifactId, "1.1.0", type);
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate2, content2);
try (final OutputStream out = new ByteArrayOutputStream()) {
- final BundleContext context = getExtensionBundleContext(
- bucketName, groupId, artifactId, "1.0.0", BundleContext.BundleType.NIFI_NAR);
- fileSystemBundleProvider.getBundleVersion(context, out);
+ fileSystemBundleProvider.getBundleVersionContent(versionCoordinate1, out);
final String retrievedContent1 = new String(((ByteArrayOutputStream) out).toByteArray(), StandardCharsets.UTF_8);
Assert.assertEquals(content1, retrievedContent1);
}
try (final OutputStream out = new ByteArrayOutputStream()) {
- final BundleContext context = getExtensionBundleContext(
- bucketName, groupId, artifactId, "1.1.0", BundleContext.BundleType.NIFI_NAR);
- fileSystemBundleProvider.getBundleVersion(context, out);
+ fileSystemBundleProvider.getBundleVersionContent(versionCoordinate2, out);
final String retrievedContent2 = new String(((ByteArrayOutputStream) out).toByteArray(), StandardCharsets.UTF_8);
Assert.assertEquals(content2, retrievedContent2);
@@ -152,139 +172,150 @@
@Test(expected = BundlePersistenceException.class)
public void testGetWhenDoesNotExist() throws IOException {
- final String bucketName = "b1";
+ final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
+ final String version = "1.0.0";
+ final BundleVersionType type = BundleVersionType.NIFI_NAR;
try (final OutputStream out = new ByteArrayOutputStream()) {
- final BundleContext context = getExtensionBundleContext(
- bucketName, groupId, artifactId, "1.0.0", BundleContext.BundleType.NIFI_NAR);
- fileSystemBundleProvider.getBundleVersion(context, out);
+ final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(bucketId, groupId, artifactId, version, type);
+ fileSystemBundleProvider.getBundleVersionContent(versionCoordinate, out);
Assert.fail("Should have thrown exception");
}
}
@Test
public void testDeleteExtensionBundleVersion() throws IOException {
- final String bucketName = "b1";
+ final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
final String version = "1.0.0";
- final BundleContext.BundleType bundleType = BundleContext.BundleType.NIFI_NAR;
+ final BundleVersionType bundleType = BundleVersionType.NIFI_NAR;
+
+ final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(bucketId, groupId, artifactId, version, bundleType);
// create and verify the bundle version
- final String content1 = groupId + "-" + artifactId + "-" + "1.0.0";
- createAndSaveBundleVersion(fileSystemBundleProvider, bucketName, groupId, artifactId, version, bundleType, content1);
- verifyBundleVersion(bundleStorageDir, bucketName, groupId, artifactId, version, bundleType, content1);
+ final String content1 = groupId + "-" + artifactId + "-" + version;
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate, content1);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
// delete the bundle version
- fileSystemBundleProvider.deleteBundleVersion(getExtensionBundleContext(bucketName, groupId, artifactId, version, bundleType));
+ fileSystemBundleProvider.deleteBundleVersion(versionCoordinate);
// verify it was deleted
- final File bundleVersionDir = FileSystemBundlePersistenceProvider.getBundleVersionDirectory(
- bundleStorageDir, bucketName, groupId, artifactId, version);
-
- final File bundleFile = FileSystemBundlePersistenceProvider.getBundleFile(
- bundleVersionDir, artifactId, version, bundleType);
+ final File bundleVersionDir = FileSystemBundlePersistenceProvider.getBundleVersionDirectory(bundleStorageDir, versionCoordinate);
+ final File bundleFile = FileSystemBundlePersistenceProvider.getBundleFile(bundleVersionDir, versionCoordinate);
Assert.assertFalse(bundleFile.exists());
}
@Test
public void testDeleteExtensionBundleVersionWhenDoesNotExist() throws IOException {
- final String bucketName = "b1";
+ final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
final String version = "1.0.0";
- final BundleContext.BundleType bundleType = BundleContext.BundleType.NIFI_NAR;
+ final BundleVersionType bundleType = BundleVersionType.NIFI_NAR;
+
+ final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(bucketId, groupId, artifactId, version, bundleType);
// verify the bundle version does not already exist
- final File bundleVersionDir = FileSystemBundlePersistenceProvider.getBundleVersionDirectory(
- bundleStorageDir, bucketName, groupId, artifactId, version);
-
- final File bundleFile = FileSystemBundlePersistenceProvider.getBundleFile(
- bundleVersionDir, artifactId, version, bundleType);
+ final File bundleVersionDir = FileSystemBundlePersistenceProvider.getBundleVersionDirectory(bundleStorageDir, versionCoordinate);
+ final File bundleFile = FileSystemBundlePersistenceProvider.getBundleFile(bundleVersionDir, versionCoordinate);
Assert.assertFalse(bundleFile.exists());
// delete the bundle version
- fileSystemBundleProvider.deleteBundleVersion(getExtensionBundleContext(bucketName, groupId, artifactId, version, bundleType));
+ fileSystemBundleProvider.deleteBundleVersion(versionCoordinate);
}
@Test
public void testDeleteAllBundleVersions() throws IOException {
- final String bucketName = "b1";
+ final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
final String version1 = "1.0.0";
final String version2 = "2.0.0";
- final BundleContext.BundleType bundleType = BundleContext.BundleType.NIFI_NAR;
+ final BundleVersionType bundleType = BundleVersionType.NIFI_NAR;
// create and verify the bundle version 1
final String content1 = groupId + "-" + artifactId + "-" + version1;
- createAndSaveBundleVersion(fileSystemBundleProvider, bucketName, groupId, artifactId, version1, bundleType, content1);
- verifyBundleVersion(bundleStorageDir, bucketName, groupId, artifactId, version1, bundleType, content1);
+ final BundleVersionCoordinate versionCoordinate1 = getVersionCoordinate(bucketId, groupId, artifactId, version1, bundleType);
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate1, content1);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate1, content1);
// create and verify the bundle version 2
final String content2 = groupId + "-" + artifactId + "-" + version2;
- createAndSaveBundleVersion(fileSystemBundleProvider, bucketName, groupId, artifactId, version2, bundleType, content2);
- verifyBundleVersion(bundleStorageDir, bucketName, groupId, artifactId, version2, bundleType, content2);
+ final BundleVersionCoordinate versionCoordinate2 = getVersionCoordinate(bucketId, groupId, artifactId, version2, bundleType);
+ createBundleVersion(fileSystemBundleProvider, versionCoordinate2, content2);
+ verifyBundleVersion(bundleStorageDir, versionCoordinate2, content2);
- fileSystemBundleProvider.deleteAllBundleVersions(bucketName, bucketName, groupId, artifactId);
+ Assert.assertEquals(1, bundleStorageDir.listFiles().length);
+ final BundleCoordinate bundleCoordinate = getBundleCoordinate(bucketId, groupId, artifactId);
+ fileSystemBundleProvider.deleteAllBundleVersions(bundleCoordinate);
Assert.assertEquals(0, bundleStorageDir.listFiles().length);
}
@Test
public void testDeleteAllBundleVersionsWhenDoesNotExist() throws IOException {
- final String bucketName = "b1";
+ final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
Assert.assertEquals(0, bundleStorageDir.listFiles().length);
- fileSystemBundleProvider.deleteAllBundleVersions(bucketName, bucketName, groupId, artifactId);
+ final BundleCoordinate bundleCoordinate = getBundleCoordinate(bucketId, groupId, artifactId);
+ fileSystemBundleProvider.deleteAllBundleVersions(bundleCoordinate);
Assert.assertEquals(0, bundleStorageDir.listFiles().length);
}
- private void createAndSaveBundleVersion(final BundlePersistenceProvider persistenceProvider,
- final String bucketName,
- final String groupId,
- final String artifactId,
- final String version,
- final BundleContext.BundleType bundleType,
- final String content) throws IOException {
-
- final BundleContext context = getExtensionBundleContext(bucketName, groupId, artifactId, version, bundleType);
-
+ private void createBundleVersion(final BundlePersistenceProvider persistenceProvider,
+ final BundleVersionCoordinate versionCoordinate,
+ final String content) throws IOException {
+ final BundlePersistenceContext context = getPersistenceContext(versionCoordinate);
try (final InputStream in = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8))) {
- persistenceProvider.saveBundleVersion(context, in, false);
+ persistenceProvider.createBundleVersion(context, in);
}
}
- private static BundleContext getExtensionBundleContext(final String bucketName,
- final String groupId,
- final String artifactId,
- final String version,
- final BundleContext.BundleType bundleType) {
- final BundleContext context = Mockito.mock(BundleContext.class);
- when(context.getBucketName()).thenReturn(bucketName);
- when(context.getBundleGroupId()).thenReturn(groupId);
- when(context.getBundleArtifactId()).thenReturn(artifactId);
- when(context.getBundleVersion()).thenReturn(version);
- when(context.getBundleType()).thenReturn(bundleType);
+ private void updateBundleVersion(final BundlePersistenceProvider persistenceProvider,
+ final BundleVersionCoordinate versionCoordinate,
+ final String content) throws IOException {
+ final BundlePersistenceContext context = getPersistenceContext(versionCoordinate);
+ try (final InputStream in = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8))) {
+ persistenceProvider.updateBundleVersion(context, in);
+ }
+ }
+
+ private static BundlePersistenceContext getPersistenceContext(final BundleVersionCoordinate versionCoordinate) {
+ final BundlePersistenceContext context = Mockito.mock(BundlePersistenceContext.class);
+ when(context.getCoordinate()).thenReturn(versionCoordinate);
return context;
}
- private static void verifyBundleVersion(final File storageDir,
- final String bucketName,
- final String groupId,
- final String artifactId,
- final String version,
- final BundleContext.BundleType bundleType,
- final String contentString) throws IOException {
+ private static BundleVersionCoordinate getVersionCoordinate(final String bucketId, final String groupId, final String artifactId,
+ final String version, final BundleVersionType bundleType) {
- final File bundleVersionDir = FileSystemBundlePersistenceProvider.getBundleVersionDirectory(
- storageDir, bucketName, groupId, artifactId, version);
+ final BundleVersionCoordinate coordinate = Mockito.mock(BundleVersionCoordinate.class);
+ when(coordinate.getBucketId()).thenReturn(bucketId);
+ when(coordinate.getGroupId()).thenReturn(groupId);
+ when(coordinate.getArtifactId()).thenReturn(artifactId);
+ when(coordinate.getVersion()).thenReturn(version);
+ when(coordinate.getType()).thenReturn(bundleType);
+ return coordinate;
+ }
- final File bundleFile = FileSystemBundlePersistenceProvider.getBundleFile(
- bundleVersionDir, artifactId, version, bundleType);
+ private static BundleCoordinate getBundleCoordinate(final String bucketId, final String groupId, final String artifactId) {
+ final BundleCoordinate coordinate = Mockito.mock(BundleCoordinate.class);
+ when(coordinate.getBucketId()).thenReturn(bucketId);
+ when(coordinate.getGroupId()).thenReturn(groupId);
+ when(coordinate.getArtifactId()).thenReturn(artifactId);
+ return coordinate;
+ }
+
+ private static void verifyBundleVersion(final File storageDir, final BundleVersionCoordinate versionCoordinate,
+ final String contentString) throws IOException {
+
+ final File bundleVersionDir = FileSystemBundlePersistenceProvider.getBundleVersionDirectory(storageDir, versionCoordinate);
+ final File bundleFile = FileSystemBundlePersistenceProvider.getBundleFile(bundleVersionDir, versionCoordinate);
Assert.assertTrue(bundleFile.exists());
try (InputStream in = new FileInputStream(bundleFile)) {
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
index 8125f31..baeb949 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
@@ -24,6 +24,7 @@
import org.apache.nifi.registry.diff.ComponentDifferenceGroup;
import org.apache.nifi.registry.diff.VersionedFlowDifference;
import org.apache.nifi.registry.exception.ResourceNotFoundException;
+import org.apache.nifi.registry.extension.BundlePersistenceProvider;
import org.apache.nifi.registry.flow.FlowPersistenceProvider;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
@@ -75,6 +76,7 @@
private MetadataService metadataService;
private FlowPersistenceProvider flowPersistenceProvider;
+ private BundlePersistenceProvider bundlePersistenceProvider;
private Serializer<VersionedProcessGroup> snapshotSerializer;
private ExtensionService extensionService;
private Validator validator;
@@ -86,6 +88,7 @@
public void setup() {
metadataService = mock(MetadataService.class);
flowPersistenceProvider = mock(FlowPersistenceProvider.class);
+ bundlePersistenceProvider = mock(BundlePersistenceProvider.class);
snapshotSerializer = mock(VersionedProcessGroupSerializer.class);
extensionService = mock(StandardExtensionService.class);
registryUrlAliasService = mock(RegistryUrlAliasService.class);
@@ -93,7 +96,8 @@
final ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
validator = validatorFactory.getValidator();
- registryService = new RegistryService(metadataService, flowPersistenceProvider, snapshotSerializer, extensionService, validator, registryUrlAliasService);
+ registryService = new RegistryService(metadataService, flowPersistenceProvider, bundlePersistenceProvider,
+ snapshotSerializer, extensionService, validator, registryUrlAliasService);
}
// ---------------------- Test Bucket methods ---------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleContext.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleContext.java
deleted file mode 100644
index 13a80e1..0000000
--- a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleContext.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.registry.extension;
-
-/**
- * The context that will be passed to the {@link BundlePersistenceProvider} when saving a new version of an extension bundle.
- */
-public interface BundleContext {
-
- enum BundleType {
- NIFI_NAR,
- MINIFI_CPP;
- }
-
- /**
- * @return the id of the bucket the bundle belongs to
- */
- String getBucketId();
-
- /**
- * @return the name of the bucket the bundle belongs to
- */
- String getBucketName();
-
- /**
- * @return the type of the bundle
- */
- BundleType getBundleType();
-
- /**
- * @return the NiFi Registry id of the bundle
- */
- String getBundleId();
-
- /**
- * @return the group id of the bundle
- */
- String getBundleGroupId();
-
- /**
- * @return the artifact id of the bundle
- */
- String getBundleArtifactId();
-
- /**
- * @return the version of the bundle
- */
- String getBundleVersion();
-
- /**
- * @return the comments for the version of the bundle
- */
- String getDescription();
-
- /**
- * @return the timestamp the bundle was created
- */
- long getTimestamp();
-
- /**
- * @return the user that created the bundle
- */
- String getAuthor();
-
-}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleCoordinate.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleCoordinate.java
new file mode 100644
index 0000000..3c2c8ed
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleCoordinate.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.extension;
+
+/**
+ * The coordinate of a bundle.
+ *
+ * Implementations of {@link BundlePersistenceProvider} will be expected to be able to delete all versions for a given BundleCoordinate.
+ */
+public interface BundleCoordinate {
+
+ /**
+ * @return the NiFi Registry bucket id where the bundle is located
+ */
+ String getBucketId();
+
+ /**
+ * @return the group id of the bundle
+ */
+ String getGroupId();
+
+ /**
+ * @return the artifact id of the bundle
+ */
+ String getArtifactId();
+
+}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundlePersistenceContext.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundlePersistenceContext.java
new file mode 100644
index 0000000..12b4c32
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundlePersistenceContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.extension;
+
+/**
+ * The context that will be passed to the {@link BundlePersistenceProvider} when saving a new version of an extension bundle.
+ */
+public interface BundlePersistenceContext {
+
+ /**
+ * @return the unique identifier of the bundle version
+ */
+ BundleVersionCoordinate getCoordinate();
+
+ /**
+ * @return the size of the bundle content in bytes
+ */
+ long getSize();
+
+ /**
+ * @return the timestamp the bundle was created
+ */
+ long getTimestamp();
+
+ /**
+ * @return the user that created the bundle
+ */
+ String getAuthor();
+
+}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundlePersistenceProvider.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundlePersistenceProvider.java
index cbae98b..f5c3471 100644
--- a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundlePersistenceProvider.java
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundlePersistenceProvider.java
@@ -29,41 +29,47 @@
/**
* Persists the binary content of a version of an extension bundle.
*
+ * This method should throw a BundlePersistenceException if content already exists for the BundleVersionCoordinate
+ * specified in the BundlePersistenceContext.
+ *
* @param context the context about the bundle version being persisted
* @param contentStream the stream of binary content to persist
- * @param overwrite if true the persistence provider should overwrite any content that may already exist for the given bundle version,
- * if false the persistence provider should throw an BundlePersistenceException if content already exists
+ * @throws BundlePersistenceException if an error occurs storing the content, or if content already exists for version coordinate
+ */
+ void createBundleVersion(BundlePersistenceContext context, InputStream contentStream) throws BundlePersistenceException;
+
+ /**
+ * Updates the binary content for a version of an extension bundle.
+ *
+ * @param context the context about the bundle version being updated
+ * @param contentStream the stream of the updated binary content
* @throws BundlePersistenceException if an error occurs storing the content
*/
- void saveBundleVersion(BundleContext context, InputStream contentStream, boolean overwrite) throws BundlePersistenceException;
+ void updateBundleVersion(BundlePersistenceContext context, InputStream contentStream) throws BundlePersistenceException;
/**
* Writes the binary content of the bundle specified by the bucket-group-artifact-version to the provided OutputStream.
*
- * @param context the context about the bundle version being retrieved
+ * @param versionCoordinate the versionCoordinate of the bundle version
* @param outputStream the output stream to write the contents to
* @throws BundlePersistenceException if an error occurs retrieving the content
*/
- void getBundleVersion(BundleContext context, OutputStream outputStream) throws BundlePersistenceException;
+ void getBundleVersionContent(BundleVersionCoordinate versionCoordinate, OutputStream outputStream) throws BundlePersistenceException;
/**
* Deletes the content of the bundle version specified by bucket-group-artifact-version.
*
- * @param context the context about the bundle version being deleted
+ * @param versionCoordinate the versionCoordinate of the bundle version
* @throws BundlePersistenceException if an error occurs deleting the content
*/
- void deleteBundleVersion(BundleContext context) throws BundlePersistenceException;
+ void deleteBundleVersion(BundleVersionCoordinate versionCoordinate) throws BundlePersistenceException;
/**
- * Deletes the content for all versions of the bundle specified by bucket-group-artifact.
+ * Deletes the content for all versions of the bundle specified by group-artifact.
*
- * @param bucketId the id of the bucket where the bundle is located
- * @param bucketName the bucket name where the bundle is located
- * @param groupId the group id of the bundle
- * @param artifactId the artifact id of the bundle
+ * @param bundleCoordinate the coordinate of the bundle to delete all versions for
* @throws BundlePersistenceException if an error occurs deleting the content
*/
- void deleteAllBundleVersions(String bucketId, String bucketName, String groupId, String artifactId)
- throws BundlePersistenceException;
+ void deleteAllBundleVersions(BundleCoordinate bundleCoordinate) throws BundlePersistenceException;
}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleVersionCoordinate.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleVersionCoordinate.java
new file mode 100644
index 0000000..5dd6146
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleVersionCoordinate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.extension;
+
+/**
+ * The coordinate of a version of a bundle.
+ *
+ * BundlePersistenceProviders will be expected to retrieve the content of a given BundleVersionCoordinate.
+ */
+public interface BundleVersionCoordinate {
+
+ /**
+ * @return the NiFi Registry bucket id where the bundle is located
+ */
+ String getBucketId();
+
+ /**
+ * @return the group id of the bundle
+ */
+ String getGroupId();
+
+ /**
+ * @return the artifact id of the bundle
+ */
+ String getArtifactId();
+
+ /**
+ * @return the version of the bundle
+ */
+ String getVersion();
+
+ /**
+ * @return the type of the bundle
+ */
+ BundleVersionType getType();
+
+ /**
+ * @return the string representation of the coordinate
+ */
+ String toString();
+}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleVersionType.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleVersionType.java
new file mode 100644
index 0000000..e5bdd75
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/extension/BundleVersionType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.extension;
+
+/**
+ * The types of bundles that can be persisted.
+ */
+public enum BundleVersionType {
+
+ NIFI_NAR,
+
+ MINIFI_CPP;
+
+}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java
index 4a4be28..7f79d54 100644
--- a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java
@@ -29,4 +29,11 @@
*/
void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException;
+ /**
+ * Called prior to destroying the provider.
+ */
+ default void preDestruction() {
+
+ }
+
}
diff --git a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/nifi-registry.properties b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/nifi-registry.properties
index e70c1bb..2341f38 100644
--- a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/nifi-registry.properties
+++ b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/nifi-registry.properties
@@ -69,6 +69,8 @@
#nifi.registry.extension.dir.1=/path/to/extension1
#nifi.registry.extension.dir.2=/path/to/extension2
+nifi.registry.extension.dir.aws=${nifi.registry.extension.dir.aws}
+
# Identity Mapping Properties #
# These properties allow normalizing user identities such that identities coming from different identity providers
# (certificates, LDAP, Kerberos) can be treated the same internally in NiFi. The following example demonstrates normalizing
diff --git a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml
index cf85a1d..4252d70 100644
--- a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml
+++ b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml
@@ -62,4 +62,29 @@
<property name="Extension Bundle Storage Directory">./extension_bundles</property>
</extensionBundlePersistenceProvider>
+ <!-- Example S3 Bundle Persistence Provider
+ - Requires nifi-registry-aws-assembly to be added to the classpath via a custom extension dir in nifi-registry.properties
+ Example: nifi.registry.extension.dir.aws=./ext/aws/lib
+ Where "./ext/aws/lib" contains the extracted contents of nifi-registry-aws-assembly
+ - "Region" - The name of the S3 region where the bucket exists
+ - "Bucket Name" - The name of an existing bucket to store extension bundles
+ - "Key Prefix" - An optional prefix that if specified will be added to the beginning of all S3 keys
+ - "Credentials Provider" - Indicates how credentials will be provided, must be a value of DEFAULT_CHAIN or STATIC
+ - DEFAULT_CHAIN will consider in order: Java system properties, environment variables, credential profiles (~/.aws/credentials)
+ - STATIC requires that "Access Key" and "Secret Access Key" be specified directly in this file
+ - "Access Key" - The access key to use when using STATIC credentials provider
+ - "Secret Access Key" - The secret access key to use when using STATIC credentials provider
+ -->
+ <!--
+ <extensionBundlePersistenceProvider>
+ <class>org.apache.nifi.registry.aws.S3BundlePersistenceProvider</class>
+ <property name="Region">us-east-1</property>
+ <property name="Bucket Name">my-bundles</property>
+ <property name="Key Prefix"></property>
+ <property name="Credentials Provider">DEFAULT_CHAIN</property>
+ <property name="Access Key"></property>
+ <property name="Secret Access Key"></property>
+ </extensionBundlePersistenceProvider>
+ -->
+
</providers>
\ No newline at end of file
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/LICENSE b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/NOTICE b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/NOTICE
new file mode 100644
index 0000000..46df494
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/NOTICE
@@ -0,0 +1,290 @@
+nifi-registry-aws-extensions
+Copyright 2019 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) AWS SDK for Java 2.0
+ The following NOTICE information applies:
+ Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+ This product includes software developed by
+ Amazon Technologies, Inc (http://www.amazon.com/).
+
+ **********************
+ THIRD PARTY COMPONENTS
+ **********************
+ This software includes third party software subject to the following copyrights:
+ - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
+ - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+ The licenses for these third party components are included in LICENSE.txt
+
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ (ASLv2) Apache Commons Logging
+ The following NOTICE information applies:
+ Apache Commons Logging
+ Copyright 2003-2016 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache HttpComponents Client
+ The following NOTICE information applies:
+ Copyright 1999-2019 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache HttpComponents Core
+ The following NOTICE information applies:
+ Copyright 2005-2019 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+ (ASLv2) The Netty Project
+ The following NOTICE information applies:
+ Copyright 2014 The Netty Project
+ -------------------------------------------------------------------------------
+ This product contains the extensions to Java Collections Framework which has
+ been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
+
+ * LICENSE:
+ * license/LICENSE.jsr166y.txt (Public Domain)
+ * HOMEPAGE:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+ * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
+
+ This product contains a modified version of Robert Harder's Public Domain
+ Base64 Encoder and Decoder, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.base64.txt (Public Domain)
+ * HOMEPAGE:
+ * http://iharder.sourceforge.net/current/java/base64/
+
+ This product contains a modified portion of 'Webbit', an event based
+ WebSocket and HTTP server, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.webbit.txt (BSD License)
+ * HOMEPAGE:
+ * https://github.com/joewalnes/webbit
+
+ This product contains a modified portion of 'SLF4J', a simple logging
+ facade for Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.slf4j.txt (MIT License)
+ * HOMEPAGE:
+ * http://www.slf4j.org/
+
+ This product contains a modified portion of 'Apache Harmony', an open source
+ Java SE, which can be obtained at:
+
+ * NOTICE:
+ * license/NOTICE.harmony.txt
+ * LICENSE:
+ * license/LICENSE.harmony.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * http://archive.apache.org/dist/harmony/
+
+ This product contains a modified portion of 'jbzip2', a Java bzip2 compression
+ and decompression library written by Matthew J. Francis. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jbzip2.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jbzip2/
+
+ This product contains a modified portion of 'libdivsufsort', a C API library to construct
+ the suffix array and the Burrows-Wheeler transformed string for any input string of
+ a constant-size alphabet written by Yuta Mori. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.libdivsufsort.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/y-256/libdivsufsort
+
+ This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
+ which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jctools.txt (ASL2 License)
+ * HOMEPAGE:
+ * https://github.com/JCTools/JCTools
+
+ This product optionally depends on 'JZlib', a re-implementation of zlib in
+ pure Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jzlib.txt (BSD style License)
+ * HOMEPAGE:
+ * http://www.jcraft.com/jzlib/
+
+ This product optionally depends on 'Compress-LZF', a Java library for encoding and
+ decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.compress-lzf.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/ning/compress
+
+ This product optionally depends on 'lz4', a LZ4 Java compression
+ and decompression library written by Adrien Grand. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lz4.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jpountz/lz4-java
+
+ This product optionally depends on 'lzma-java', a LZMA Java compression
+ and decompression library, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lzma-java.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jponge/lzma-java
+
+ This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
+ and decompression library written by William Kinney. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jfastlz.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jfastlz/
+
+ This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
+ interchange format, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.protobuf.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/protobuf
+
+ This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
+ a temporary self-signed X.509 certificate when the JVM does not provide the
+ equivalent functionality. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.bouncycastle.txt (MIT License)
+ * HOMEPAGE:
+ * http://www.bouncycastle.org/
+
+ This product optionally depends on 'Snappy', a compression library produced
+ by Google Inc, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.snappy.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/snappy
+
+ This product optionally depends on 'JBoss Marshalling', an alternative Java
+ serialization API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jboss-marshalling.txt (GNU LGPL 2.1)
+ * HOMEPAGE:
+ * http://www.jboss.org/jbossmarshalling
+
+ This product optionally depends on 'Caliper', Google's micro-
+ benchmarking framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.caliper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/google/caliper
+
+ This product optionally depends on 'Apache Commons Logging', a logging
+ framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-logging.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * http://commons.apache.org/logging/
+
+ This product optionally depends on 'Apache Log4J', a logging framework, which
+ can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.log4j.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * http://logging.apache.org/log4j/
+
+ This product optionally depends on 'Aalto XML', an ultra-high performance
+ non-blocking XML processor, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.aalto-xml.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * http://wiki.fasterxml.com/AaltoHome
+
+ This product contains a modified version of 'HPACK', a Java implementation of
+ the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.hpack.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/twitter/hpack
+
+ This product contains a modified portion of 'Apache Commons Lang', a Java library
+ provides utilities for the java.lang API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-lang.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://commons.apache.org/proper/commons-lang/
+
+
+ This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build.
+
+ * LICENSE:
+ * license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/takari/maven-wrapper
+
+*****************
+Public Domain
+*****************
+
+The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details.
+
+ (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.2 - https://github.com/reactive-streams/reactive-streams-jvm)
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/README.md b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/README.md
new file mode 100644
index 0000000..008a850
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/README.md
@@ -0,0 +1,86 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+# NiFi Registry AWS extensions
+
+This modules provides AWS related extensions for NiFi Registry.
+
+## Prerequisites
+
+* AWS account credentials and an S3 bucket.
+
+## How to install
+
+### Enable AWS extensions at NiFi Registry build
+
+The AWS extensions will be automatically included when you build NiFi Registry and will be installed at the `${NIFI_REG_HOME}/ext/aws` directory.
+
+If you wish to build NiFi Registry without including the AWS extensions, specify the `skipAws` system property:
+```
+cd nifi-registry
+mvn clean install -DskipAws
+```
+
+### Add AWS extensions to existing NiFi Registry
+
+To add AWS extensions to an existing NiFi Registry, build the extension with the following command:
+
+```
+cd nifi-registry
+mvn clean install -f nifi-registry-extensions/nifi-registry-aws
+```
+
+The extension zip will be created as `nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/target/nifi-registry-aws-assembly-xxx-bin.zip`.
+
+Unzip the file into arbitrary directory so that NiFi Registry can use, such as `${NIFI_REG_HOME}/ext/aws`.
+For example:
+
+```
+mkdir -p ${NIFI_REG_HOME}/ext/aws
+unzip -d ${NIFI_REG_HOME}/ext/aws nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/target/nifi-registry-aws-assembly-xxx-bin.zip
+```
+
+## NiFi Registry Configuration
+
+In order to use this extension, the following NiFi Registry files need to be configured.
+
+### nifi-registry.properties
+
+The extension dir property will be automatically configured when building with the `include-aws` profile (i.e. when not specifying -DskipAws).
+
+To manually specify the property when adding the AWS extensions to an existing NiFi registry, configure the following property:
+```
+# Specify AWS extension dir
+nifi.registry.extension.dir.aws=./ext/aws/lib
+```
+
+### providers.xml
+
+Uncomment the extensionBundlePersistenceProvider for S3:
+```
+<!--
+<extensionBundlePersistenceProvider>
+ <class>org.apache.nifi.registry.aws.S3BundlePersistenceProvider</class>
+ <property name="Region">us-east-1</property>
+ <property name="Bucket Name">my-bundles</property>
+ <property name="Key Prefix"></property>
+ <property name="Credentials Provider">DEFAULT_CHAIN</property>
+ <property name="Access Key"></property>
+ <property name="Secret Access Key"></property>
+</extensionBundlePersistenceProvider>
+-->
+```
+
+NOTE: Remember to remove, or comment out, the FileSystemBundlePersistenceProvider since there can only be one defined.
+
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/pom.xml b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/pom.xml
new file mode 100644
index 0000000..6207054
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-registry-aws</artifactId>
+ <groupId>org.apache.nifi.registry</groupId>
+ <version>0.4.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-registry-aws-assembly</artifactId>
+ <packaging>pom</packaging>
+ <description>AWS extensions for Apache NiFi Registry</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-aws-extensions</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <attach>true</attach>
+ </configuration>
+ <executions>
+ <execution>
+ <id>assemble-aws-extensions</id>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/extension.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/src/main/assembly/extension.xml b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/src/main/assembly/extension.xml
new file mode 100644
index 0000000..87f2e0f
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-assembly/src/main/assembly/extension.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly>
+ <id>bin</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <scope>runtime</scope>
+ <useProjectArtifact>false</useProjectArtifact>
+ <outputDirectory>lib</outputDirectory>
+ <directoryMode>0770</directoryMode>
+ <fileMode>0660</fileMode>
+ </dependencySet>
+ </dependencySets>
+
+ <files>
+ <file>
+ <source>./README.md</source>
+ <outputDirectory>./</outputDirectory>
+ <destName>README.md</destName>
+ <fileMode>0644</fileMode>
+ <filtered>true</filtered>
+ </file>
+ <file>
+ <source>./LICENSE</source>
+ <outputDirectory>./</outputDirectory>
+ <destName>LICENSE</destName>
+ <fileMode>0644</fileMode>
+ <filtered>true</filtered>
+ </file>
+ <file>
+ <source>./NOTICE</source>
+ <outputDirectory>./</outputDirectory>
+ <destName>NOTICE</destName>
+ <fileMode>0644</fileMode>
+ <filtered>true</filtered>
+ </file>
+ </files>
+
+</assembly>
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/pom.xml b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/pom.xml
new file mode 100644
index 0000000..8efdb31
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-registry-aws</artifactId>
+ <groupId>org.apache.nifi.registry</groupId>
+ <version>0.4.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-registry-aws-extensions</artifactId>
+ <description>AWS extensions for Apache NiFi Registry</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-provider-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-utils</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>apache-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/main/java/org/apache/nifi/registry/aws/S3BundlePersistenceProvider.java b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/main/java/org/apache/nifi/registry/aws/S3BundlePersistenceProvider.java
new file mode 100644
index 0000000..6254649
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/main/java/org/apache/nifi/registry/aws/S3BundlePersistenceProvider.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.aws;
+
+import org.apache.nifi.registry.extension.BundleCoordinate;
+import org.apache.nifi.registry.extension.BundlePersistenceContext;
+import org.apache.nifi.registry.extension.BundlePersistenceException;
+import org.apache.nifi.registry.extension.BundlePersistenceProvider;
+import org.apache.nifi.registry.extension.BundleVersionCoordinate;
+import org.apache.nifi.registry.extension.BundleVersionType;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.utils.IoUtils;
+import software.amazon.awssdk.utils.StringUtils;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * An {@link BundlePersistenceProvider} that uses AWS S3 for storage.
+ */
+public class S3BundlePersistenceProvider implements BundlePersistenceProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(S3BundlePersistenceProvider.class);
+
+ public static final String REGION_PROP = "Region";
+ public static final String BUCKET_NAME_PROP = "Bucket Name";
+ public static final String KEY_PREFIX_PROP = "Key Prefix";
+ public static final String CREDENTIALS_PROVIDER_PROP = "Credentials Provider";
+ public static final String ACCESS_KEY_PROP = "Access Key";
+ public static final String SECRET_ACCESS_KEY_PROP = "Secret Access Key";
+
+ public static final String NAR_EXTENSION = ".nar";
+ public static final String CPP_EXTENSION = ".cpp";
+
+ public enum CredentialProvider {
+ STATIC,
+ DEFAULT_CHAIN
+ }
+
+ private volatile S3Client s3Client;
+ private volatile String s3BucketName;
+ private volatile String s3KeyPrefix;
+
+ @Override
+ public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ s3BucketName = configurationContext.getProperties().get(BUCKET_NAME_PROP);
+ if (StringUtils.isBlank(s3BucketName)) {
+ throw new ProviderCreationException("The property '" + BUCKET_NAME_PROP + "' must be provided");
+ }
+
+ final String keyPrefixValue = configurationContext.getProperties().get(KEY_PREFIX_PROP);
+ s3KeyPrefix = StringUtils.isBlank(keyPrefixValue) ? null : keyPrefixValue;
+
+ s3Client = createS3Client(configurationContext);
+ }
+
+ protected S3Client createS3Client(final ProviderConfigurationContext configurationContext) {
+
+ return S3Client.builder()
+ .region(getRegion(configurationContext))
+ .credentialsProvider(getCredentialsProvider(configurationContext))
+ .build();
+ }
+
+ private Region getRegion(final ProviderConfigurationContext configurationContext) {
+ final String regionValue = configurationContext.getProperties().get(REGION_PROP);
+ if (StringUtils.isBlank(regionValue)) {
+ throw new ProviderCreationException("The property '" + REGION_PROP + "' must be provided");
+ }
+
+ Region region = null;
+ for (Region r : Region.regions()) {
+ if (r.id().equals(regionValue)) {
+ region = r;
+ break;
+ }
+ }
+
+ if (region == null) {
+ LOGGER.warn("The provided region was not found in the list of known regions. This may indicate an invalid region, " +
+ "or may indicate a region that is newer than the known list of regions");
+ region = Region.of(regionValue);
+ }
+
+ LOGGER.debug("Using region {}", new Object[] {region.id()});
+ return region;
+ }
+
+ private AwsCredentialsProvider getCredentialsProvider(final ProviderConfigurationContext configurationContext) {
+ final String credentialsProviderValue = configurationContext.getProperties().get(CREDENTIALS_PROVIDER_PROP);
+ if (StringUtils.isBlank(credentialsProviderValue)) {
+ throw new ProviderCreationException("The property '" + CREDENTIALS_PROVIDER_PROP + "' must be provided");
+ }
+
+ CredentialProvider credentialProvider;
+ try {
+ credentialProvider = CredentialProvider.valueOf(credentialsProviderValue);
+ } catch (Exception e) {
+ throw new ProviderCreationException("The property '" + CREDENTIALS_PROVIDER_PROP + "' must be one of ["
+ + CredentialProvider.STATIC + ", " + CredentialProvider.DEFAULT_CHAIN + " ]");
+ }
+
+ if (CredentialProvider.STATIC == credentialProvider) {
+ final String accesKeyValue = configurationContext.getProperties().get(ACCESS_KEY_PROP);
+ final String secretAccessKey = configurationContext.getProperties().get(SECRET_ACCESS_KEY_PROP);
+
+ if (StringUtils.isBlank(accesKeyValue) || StringUtils.isBlank(secretAccessKey)) {
+ throw new ProviderCreationException("The properties '" + ACCESS_KEY_PROP + "' and '" + SECRET_ACCESS_KEY_PROP
+ + "' must be provided when using " + CredentialProvider.STATIC + " credentials provider");
+ }
+
+ LOGGER.debug("Creating StaticCredentialsProvider");
+ final AwsCredentials awsCredentials = AwsBasicCredentials.create(accesKeyValue, secretAccessKey);
+ return StaticCredentialsProvider.create(awsCredentials);
+
+ } else {
+ LOGGER.debug("Creating DefaultCredentialsProvider");
+ return DefaultCredentialsProvider.create();
+ }
+ }
+
+ @Override
+ public synchronized void createBundleVersion(final BundlePersistenceContext context, final InputStream contentStream)
+ throws BundlePersistenceException {
+ createOrUpdateBundleVersion(context, contentStream);
+ }
+
+ @Override
+ public synchronized void updateBundleVersion(final BundlePersistenceContext context, final InputStream contentStream) throws BundlePersistenceException {
+ createOrUpdateBundleVersion(context, contentStream);
+ }
+
+ private synchronized void createOrUpdateBundleVersion(final BundlePersistenceContext context, final InputStream contentStream)
+ throws BundlePersistenceException {
+ final String key = getKey(context.getCoordinate());
+ LOGGER.debug("Saving bundle version to S3 in bucket '{}' with key '{}'", new Object[]{s3BucketName, key});
+
+ final PutObjectRequest request = PutObjectRequest.builder()
+ .bucket(s3BucketName)
+ .key(key)
+ .build();
+
+ final RequestBody requestBody = RequestBody.fromInputStream(contentStream, context.getSize());
+ try {
+ s3Client.putObject(request, requestBody);
+ LOGGER.debug("Successfully saved bundle version to S3 bucket '{}' with key '{}'", new Object[]{s3BucketName, key});
+ } catch (Exception e) {
+ throw new BundlePersistenceException("Error saving bundle version to S3 due to: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public synchronized void getBundleVersionContent(final BundleVersionCoordinate versionCoordinate, final OutputStream outputStream)
+ throws BundlePersistenceException {
+ final String key = getKey(versionCoordinate);
+ LOGGER.debug("Retrieving bundle version from S3 bucket '{}' with key '{}'", new Object[]{s3BucketName, key});
+
+ final GetObjectRequest request = GetObjectRequest.builder()
+ .bucket(s3BucketName)
+ .key(key)
+ .build();
+
+ try (final ResponseInputStream<GetObjectResponse> response = s3Client.getObject(request)) {
+ IoUtils.copy(response, outputStream);
+ LOGGER.debug("Successfully retrieved bundle version from S3 bucket '{}' with key '{}'", new Object[]{s3BucketName, key});
+ } catch (Exception e) {
+ throw new BundlePersistenceException("Error retrieving bundle version from S3 due to: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public synchronized void deleteBundleVersion(final BundleVersionCoordinate versionCoordinate) throws BundlePersistenceException {
+ final String key = getKey(versionCoordinate);
+ LOGGER.debug("Deleting bundle version from S3 bucket '{}' with key '{}'", new Object[]{s3BucketName, key});
+
+ final DeleteObjectRequest request = DeleteObjectRequest.builder()
+ .bucket(s3BucketName)
+ .key(key)
+ .build();
+
+ try {
+ s3Client.deleteObject(request);
+ LOGGER.debug("Successfully deleted bundle version from S3 bucket '{}' with key '{}'", new Object[]{s3BucketName, key});
+ } catch (Exception e) {
+ throw new BundlePersistenceException("Error deleting bundle version from S3 due to: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public synchronized void deleteAllBundleVersions(final BundleCoordinate bundleCoordinate) throws BundlePersistenceException {
+ final String basePrefix = s3KeyPrefix == null ? "" : s3KeyPrefix + "/";
+ final String bundlePrefix = getBundlePrefix(bundleCoordinate.getBucketId(), bundleCoordinate.getGroupId(), bundleCoordinate.getArtifactId());
+
+ final String prefix = basePrefix + bundlePrefix;
+ LOGGER.debug("Deleting all bundle versions from S3 bucket '{}' with prefix '{}'", new Object[]{s3BucketName, prefix});
+
+ try {
+ // List all the objects in the bucket with the given prefix of group/artifact...
+ final ListObjectsResponse objectsResponse = s3Client.listObjects(
+ ListObjectsRequest.builder()
+ .bucket(s3BucketName)
+ .prefix(prefix)
+ .build()
+ );
+
+ // Now delete each object, might be able to do this more efficiently with bulk delete
+ for (final S3Object s3Object : objectsResponse.contents()) {
+ final String s3ObjectKey = s3Object.key();
+ s3Client.deleteObject(DeleteObjectRequest.builder()
+ .bucket(s3BucketName)
+ .key(s3ObjectKey)
+ .build()
+ );
+ LOGGER.debug("Successfully object from S3 bucket '{}' with key '{}'", new Object[]{s3BucketName, s3ObjectKey});
+ }
+
+ LOGGER.debug("Successfully deleted all bundle versions from S3 bucket '{}' with prefix '{}'", new Object[]{s3BucketName, prefix});
+ } catch (Exception e) {
+ throw new BundlePersistenceException("Error deleting bundle versions from S3 due to: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void preDestruction() {
+ s3Client.close();
+ }
+
+ private String getKey(final BundleVersionCoordinate coordinate) {
+ final String bundlePrefix = getBundlePrefix(coordinate.getBucketId(), coordinate.getGroupId(), coordinate.getArtifactId());
+
+ final String sanitizedArtifact = sanitize(coordinate.getArtifactId());
+ final String sanitizedVersion = sanitize(coordinate.getVersion());
+
+ final String bundleFileExtension = getBundleFileExtension(coordinate.getType());
+ final String bundleFilename = sanitizedArtifact + "-" + sanitizedVersion + bundleFileExtension;
+
+ final String key = bundlePrefix + "/" + sanitizedVersion + "/" + bundleFilename;
+ if (s3KeyPrefix == null) {
+ return key;
+ } else {
+ return s3KeyPrefix + "/" + key;
+ }
+ }
+
+ private String getBundlePrefix(final String bucketId, final String groupId, final String artifactId) {
+ final String sanitizedBucketId = sanitize(bucketId);
+ final String sanitizedGroup = sanitize(groupId);
+ final String sanitizedArtifact = sanitize(artifactId);
+ return sanitizedBucketId + "/" + sanitizedGroup + "/" + sanitizedArtifact;
+ }
+
+ private static String sanitize(final String input) {
+ return FileUtils.sanitizeFilename(input).trim().toLowerCase();
+ }
+
+ static String getBundleFileExtension(final BundleVersionType bundleType) {
+ switch (bundleType) {
+ case NIFI_NAR:
+ return NAR_EXTENSION;
+ case MINIFI_CPP:
+ return CPP_EXTENSION;
+ default:
+ LOGGER.warn("Unknown bundle type: " + bundleType);
+ return "";
+ }
+ }
+}
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.extension.BundlePersistenceProvider b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.extension.BundlePersistenceProvider
new file mode 100644
index 0000000..dd214c5
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.extension.BundlePersistenceProvider
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.registry.aws.S3BundlePersistenceProvider
\ No newline at end of file
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/test/java/org/apache/nifi/registry/aws/S3BundlePersistenceProviderIT.java b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/test/java/org/apache/nifi/registry/aws/S3BundlePersistenceProviderIT.java
new file mode 100644
index 0000000..491dbb7
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/test/java/org/apache/nifi/registry/aws/S3BundlePersistenceProviderIT.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.aws;
+
+import org.apache.nifi.registry.extension.BundleCoordinate;
+import org.apache.nifi.registry.extension.BundlePersistenceContext;
+import org.apache.nifi.registry.extension.BundlePersistenceProvider;
+import org.apache.nifi.registry.extension.BundleVersionCoordinate;
+import org.apache.nifi.registry.extension.BundleVersionType;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class S3BundlePersistenceProviderIT {
+
+ private S3Client s3Client;
+
+ private BundlePersistenceProvider provider;
+ private ProviderConfigurationContext configurationContext;
+
+ @Before
+ public void setup() {
+ final Region region = Region.US_EAST_1;
+ final String bucketName = "integration-test-" + System.currentTimeMillis();
+
+ // Create a separate client just for the IT test so we can setup a new bucket
+ s3Client = S3Client.builder().region(region)
+ .credentialsProvider(DefaultCredentialsProvider.create())
+ .build();
+
+ final CreateBucketRequest createBucketRequest = CreateBucketRequest.builder()
+ .bucket(bucketName)
+ .build();
+
+ s3Client.createBucket(createBucketRequest);
+
+ // Create config context and provider, and call onConfigured
+ final Map<String,String> properties = new HashMap<>();
+ properties.put(S3BundlePersistenceProvider.REGION_PROP, region.id());
+ properties.put(S3BundlePersistenceProvider.BUCKET_NAME_PROP, bucketName);
+ properties.put(S3BundlePersistenceProvider.CREDENTIALS_PROVIDER_PROP,
+ S3BundlePersistenceProvider.CredentialProvider.DEFAULT_CHAIN.name());
+
+ configurationContext = mock(ProviderConfigurationContext.class);
+ when(configurationContext.getProperties()).thenReturn(properties);
+
+ provider = new S3BundlePersistenceProvider();
+ provider.onConfigured(configurationContext);
+ }
+
+ @After
+ public void teardown() {
+ try {
+ provider.preDestruction();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ try {
+ s3Client.close();
+ } catch (Exception e) {
+ e.printStackTrace();;
+ }
+ }
+
+ @Test
+ @Ignore // Remove to run this against S3, assumes you have setup external credentials
+ public void testS3PersistenceProvider() throws IOException {
+ final File narFile = new File("src/test/resources/nars/nifi-foo-nar-1.0.0.nar");
+
+ final UUID bucketId = UUID.randomUUID();
+
+ // Save bundle version #1
+ final BundleVersionCoordinate versionCoordinate1 = mock(BundleVersionCoordinate.class);
+ when(versionCoordinate1.getBucketId()).thenReturn(bucketId.toString());
+ when(versionCoordinate1.getGroupId()).thenReturn("org.apache.nifi");
+ when(versionCoordinate1.getArtifactId()).thenReturn("nifi-foo-nar");
+ when(versionCoordinate1.getVersion()).thenReturn("1.0.0");
+ when(versionCoordinate1.getType()).thenReturn(BundleVersionType.NIFI_NAR);
+
+ final BundlePersistenceContext context1 = mock(BundlePersistenceContext.class);
+ when(context1.getCoordinate()).thenReturn(versionCoordinate1);
+ when(context1.getSize()).thenReturn(narFile.length());
+
+ try (final InputStream in = new FileInputStream(narFile)) {
+ provider.createBundleVersion(context1, in);
+ }
+
+ // Save bundle version #2
+ final BundleVersionCoordinate versionCoordinate2 = mock(BundleVersionCoordinate.class);
+ when(versionCoordinate2.getBucketId()).thenReturn(bucketId.toString());
+ when(versionCoordinate2.getGroupId()).thenReturn("org.apache.nifi");
+ when(versionCoordinate2.getArtifactId()).thenReturn("nifi-foo-nar");
+ when(versionCoordinate2.getVersion()).thenReturn("2.0.0");
+ when(versionCoordinate2.getType()).thenReturn(BundleVersionType.NIFI_NAR);
+
+ final BundlePersistenceContext context2 = mock(BundlePersistenceContext.class);
+ when(context2.getCoordinate()).thenReturn(versionCoordinate2);
+ when(context2.getSize()).thenReturn(narFile.length());
+
+ try (final InputStream in = new FileInputStream(narFile)) {
+ provider.createBundleVersion(context2, in);
+ }
+
+ // Verify we can retrieve version #1
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ provider.getBundleVersionContent(versionCoordinate1, outputStream);
+ assertEquals(context1.getSize(), outputStream.size());
+
+ // Delete version #1
+ provider.deleteBundleVersion(versionCoordinate1);
+
+ // Verify we can no longer retrieve version #1
+ final ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream();
+ try {
+ provider.getBundleVersionContent(versionCoordinate1, outputStream2);
+ fail("Should have thrown exception");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ // Call delete all bundle versions which should leave an empty bucket
+ final BundleCoordinate bundleCoordinate = mock(BundleCoordinate.class);
+ when(bundleCoordinate.getBucketId()).thenReturn(bucketId.toString());
+ when(bundleCoordinate.getGroupId()).thenReturn("org.apache.nifi");
+ when(bundleCoordinate.getArtifactId()).thenReturn("nifi-foo-nar");
+
+ provider.deleteAllBundleVersions(bundleCoordinate);
+ }
+
+}
diff --git a/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/test/resources/nars/nifi-foo-nar-1.0.0.nar b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/test/resources/nars/nifi-foo-nar-1.0.0.nar
new file mode 100644
index 0000000..b43aa7b
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/nifi-registry-aws-extensions/src/test/resources/nars/nifi-foo-nar-1.0.0.nar
Binary files differ
diff --git a/nifi-registry-extensions/nifi-registry-aws/pom.xml b/nifi-registry-extensions/nifi-registry-aws/pom.xml
new file mode 100644
index 0000000..27d08ec
--- /dev/null
+++ b/nifi-registry-extensions/nifi-registry-aws/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-registry-extensions</artifactId>
+ <groupId>org.apache.nifi.registry</groupId>
+ <version>0.4.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-registry-aws</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>nifi-registry-aws-assembly</module>
+ <module>nifi-registry-aws-extensions</module>
+ </modules>
+
+ <properties>
+ <aws-java-sdk-version>2.5.9</aws-java-sdk-version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>bom</artifactId>
+ <version>${aws-java-sdk-version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+</project>
diff --git a/nifi-registry-extensions/nifi-registry-ranger/nifi-registry-ranger-assembly/NOTICE b/nifi-registry-extensions/nifi-registry-ranger/nifi-registry-ranger-assembly/NOTICE
index 56c03f6..e30810d 100644
--- a/nifi-registry-extensions/nifi-registry-ranger/nifi-registry-ranger-assembly/NOTICE
+++ b/nifi-registry-extensions/nifi-registry-ranger/nifi-registry-ranger-assembly/NOTICE
@@ -1,5 +1,5 @@
nifi-registry-ranger-extension
-Copyright 2018 The Apache Software Foundation
+Copyright 2019 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/nifi-registry-extensions/pom.xml b/nifi-registry-extensions/pom.xml
index 803890f..90a2ace 100644
--- a/nifi-registry-extensions/pom.xml
+++ b/nifi-registry-extensions/pom.xml
@@ -26,7 +26,8 @@
<modules>
<module>nifi-registry-ranger</module>
+ <module>nifi-registry-aws</module>
</modules>
-</project>
\ No newline at end of file
+</project>