| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.registry.web.api; |
| |
| import org.apache.commons.codec.digest.DigestUtils; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.nifi.registry.authorization.CurrentUser; |
| import org.apache.nifi.registry.authorization.Permissions; |
| import org.apache.nifi.registry.bucket.Bucket; |
| import org.apache.nifi.registry.bucket.BucketItem; |
| import org.apache.nifi.registry.bucket.BucketItemType; |
| import org.apache.nifi.registry.client.BucketClient; |
| import org.apache.nifi.registry.client.BundleClient; |
| import org.apache.nifi.registry.client.BundleVersionClient; |
| import org.apache.nifi.registry.client.ExtensionClient; |
| import org.apache.nifi.registry.client.ExtensionRepoClient; |
| import org.apache.nifi.registry.client.FlowClient; |
| import org.apache.nifi.registry.client.FlowSnapshotClient; |
| import org.apache.nifi.registry.client.ItemsClient; |
| import org.apache.nifi.registry.client.NiFiRegistryClient; |
| import org.apache.nifi.registry.client.NiFiRegistryClientConfig; |
| import org.apache.nifi.registry.client.NiFiRegistryException; |
| import org.apache.nifi.registry.client.UserClient; |
| import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient; |
| import org.apache.nifi.registry.diff.VersionedFlowDifference; |
| import org.apache.nifi.registry.extension.bundle.BuildInfo; |
| import org.apache.nifi.registry.extension.bundle.Bundle; |
| import org.apache.nifi.registry.extension.bundle.BundleFilterParams; |
| import org.apache.nifi.registry.extension.bundle.BundleType; |
| import org.apache.nifi.registry.extension.bundle.BundleVersion; |
| import org.apache.nifi.registry.extension.bundle.BundleVersionDependency; |
| import org.apache.nifi.registry.extension.bundle.BundleVersionFilterParams; |
| import org.apache.nifi.registry.extension.bundle.BundleVersionMetadata; |
| import org.apache.nifi.registry.extension.component.ExtensionFilterParams; |
| import org.apache.nifi.registry.extension.component.ExtensionMetadataContainer; |
| import org.apache.nifi.registry.extension.component.TagCount; |
| import org.apache.nifi.registry.extension.component.manifest.Extension; |
| import org.apache.nifi.registry.extension.component.ExtensionMetadata; |
| import org.apache.nifi.registry.extension.component.manifest.ExtensionType; |
| import org.apache.nifi.registry.extension.component.manifest.ProvidedServiceAPI; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoArtifact; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoBucket; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoExtensionMetadata; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoGroup; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoVersion; |
| import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary; |
| import org.apache.nifi.registry.field.Fields; |
| import org.apache.nifi.registry.flow.VersionedFlow; |
| import org.apache.nifi.registry.flow.VersionedFlowSnapshot; |
| import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; |
| import org.apache.nifi.registry.flow.VersionedProcessGroup; |
| import org.apache.nifi.registry.flow.VersionedProcessor; |
| import org.apache.nifi.registry.flow.VersionedPropertyDescriptor; |
| import org.apache.nifi.registry.util.FileUtils; |
| import org.bouncycastle.util.encoders.Hex; |
| import org.glassfish.jersey.media.multipart.MultiPartFeature; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.ws.rs.client.Client; |
| import javax.ws.rs.client.ClientBuilder; |
| import javax.ws.rs.client.WebTarget; |
| import javax.ws.rs.core.MediaType; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.security.NoSuchAlgorithmException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Test all basic functionality of JerseyNiFiRegistryClient. |
| */ |
| public class UnsecuredNiFiRegistryClientIT extends UnsecuredITBase { |
| |
| static final Logger LOGGER = LoggerFactory.getLogger(UnsecuredNiFiRegistryClientIT.class); |
| |
| private NiFiRegistryClient client; |
| |
| @Before |
| public void setup() throws IOException { |
| final String baseUrl = createBaseURL(); |
| LOGGER.info("Using base url = " + baseUrl); |
| |
| final NiFiRegistryClientConfig clientConfig = new NiFiRegistryClientConfig.Builder() |
| .baseUrl(baseUrl) |
| .build(); |
| |
| assertNotNull(clientConfig); |
| |
| final NiFiRegistryClient client = new JerseyNiFiRegistryClient.Builder() |
| .config(clientConfig) |
| .build(); |
| |
| assertNotNull(client); |
| this.client = client; |
| |
| // Clear the extension bundles storage directory in case previous tests left data |
| final File extensionsStorageDir = new File("./target/test-classes/extension_bundles"); |
| if (extensionsStorageDir.exists()) { |
| try { |
| FileUtils.deleteFile(extensionsStorageDir, true); |
| } catch (Exception e) { |
| LOGGER.warn("Unable to delete extensions storage dir due to: " + e.getMessage(), e); |
| } |
| } |
| } |
| |
| @After |
| public void teardown() { |
| try { |
| client.close(); |
| } catch (Exception e) { |
| |
| } |
| } |
| |
| @Test |
| public void testGetAccessStatus() throws IOException, NiFiRegistryException { |
| final UserClient userClient = client.getUserClient(); |
| final CurrentUser currentUser = userClient.getAccessStatus(); |
| assertEquals("anonymous", currentUser.getIdentity()); |
| assertTrue(currentUser.isAnonymous()); |
| assertNotNull(currentUser.getResourcePermissions()); |
| Permissions fullAccess = new Permissions().withCanRead(true).withCanWrite(true).withCanDelete(true); |
| assertEquals(fullAccess, currentUser.getResourcePermissions().getAnyTopLevelResource()); |
| assertEquals(fullAccess, currentUser.getResourcePermissions().getBuckets()); |
| assertEquals(fullAccess, currentUser.getResourcePermissions().getTenants()); |
| assertEquals(fullAccess, currentUser.getResourcePermissions().getPolicies()); |
| assertEquals(fullAccess, currentUser.getResourcePermissions().getProxy()); |
| } |
| |
| @Test |
| public void testNiFiRegistryClient() throws IOException, NiFiRegistryException, NoSuchAlgorithmException { |
| // ---------------------- TEST BUCKETS --------------------------// |
| |
| final BucketClient bucketClient = client.getBucketClient(); |
| |
| // create buckets |
| final int numBuckets = 10; |
| final List<Bucket> createdBuckets = new ArrayList<>(); |
| |
| for (int i=0; i < numBuckets; i++) { |
| final Bucket createdBucket = createBucket(bucketClient, i); |
| LOGGER.info("Created bucket # " + i + " with id " + createdBucket.getIdentifier()); |
| createdBuckets.add(createdBucket); |
| } |
| |
| // get each bucket |
| for (final Bucket bucket : createdBuckets) { |
| final Bucket retrievedBucket = bucketClient.get(bucket.getIdentifier()); |
| assertNotNull(retrievedBucket); |
| assertFalse(retrievedBucket.isAllowBundleRedeploy()); |
| LOGGER.info("Retrieved bucket " + retrievedBucket.getIdentifier()); |
| } |
| |
| //final Bucket nonExistentBucket = bucketClient.get("does-not-exist"); |
| //assertNull(nonExistentBucket); |
| |
| // get bucket fields |
| final Fields bucketFields = bucketClient.getFields(); |
| assertNotNull(bucketFields); |
| LOGGER.info("Retrieved bucket fields, size = " + bucketFields.getFields().size()); |
| assertTrue(bucketFields.getFields().size() > 0); |
| |
| // get all buckets |
| final List<Bucket> allBuckets = bucketClient.getAll(); |
| LOGGER.info("Retrieved buckets, size = " + allBuckets.size()); |
| assertEquals(numBuckets, allBuckets.size()); |
| allBuckets.stream().forEach(b -> System.out.println("Retrieve bucket " + b.getIdentifier())); |
| |
| // update each bucket |
| for (final Bucket bucket : createdBuckets) { |
| final Bucket bucketUpdate = new Bucket(); |
| bucketUpdate.setIdentifier(bucket.getIdentifier()); |
| bucketUpdate.setDescription(bucket.getDescription() + " UPDATE"); |
| |
| final Bucket updatedBucket = bucketClient.update(bucketUpdate); |
| assertNotNull(updatedBucket); |
| LOGGER.info("Updated bucket " + updatedBucket.getIdentifier()); |
| } |
| |
| // ---------------------- TEST FLOWS --------------------------// |
| |
| final FlowClient flowClient = client.getFlowClient(); |
| |
| // create flows |
| final Bucket flowsBucket = createdBuckets.get(0); |
| |
| final VersionedFlow flow1 = createFlow(flowClient, flowsBucket, 1); |
| LOGGER.info("Created flow # 1 with id " + flow1.getIdentifier()); |
| |
| final VersionedFlow flow2 = createFlow(flowClient, flowsBucket, 2); |
| LOGGER.info("Created flow # 2 with id " + flow2.getIdentifier()); |
| |
| // get flow |
| final VersionedFlow retrievedFlow1 = flowClient.get(flowsBucket.getIdentifier(), flow1.getIdentifier()); |
| assertNotNull(retrievedFlow1); |
| LOGGER.info("Retrieved flow # 1 with id " + retrievedFlow1.getIdentifier()); |
| |
| final VersionedFlow retrievedFlow2 = flowClient.get(flowsBucket.getIdentifier(), flow2.getIdentifier()); |
| assertNotNull(retrievedFlow2); |
| LOGGER.info("Retrieved flow # 2 with id " + retrievedFlow2.getIdentifier()); |
| |
| // get flow without bucket |
| final VersionedFlow retrievedFlow1WithoutBucket = flowClient.get(flow1.getIdentifier()); |
| assertNotNull(retrievedFlow1WithoutBucket); |
| assertEquals(flow1.getIdentifier(), retrievedFlow1WithoutBucket.getIdentifier()); |
| LOGGER.info("Retrieved flow # 1 without bucket id, with id " + retrievedFlow1WithoutBucket.getIdentifier()); |
| |
| // update flows |
| final VersionedFlow flow1Update = new VersionedFlow(); |
| flow1Update.setIdentifier(flow1.getIdentifier()); |
| flow1Update.setName(flow1.getName() + " UPDATED"); |
| |
| final VersionedFlow updatedFlow1 = flowClient.update(flowsBucket.getIdentifier(), flow1Update); |
| assertNotNull(updatedFlow1); |
| LOGGER.info("Updated flow # 1 with id " + updatedFlow1.getIdentifier()); |
| |
| // get flow fields |
| final Fields flowFields = flowClient.getFields(); |
| assertNotNull(flowFields); |
| LOGGER.info("Retrieved flow fields, size = " + flowFields.getFields().size()); |
| assertTrue(flowFields.getFields().size() > 0); |
| |
| // get flows in bucket |
| final List<VersionedFlow> flowsInBucket = flowClient.getByBucket(flowsBucket.getIdentifier()); |
| assertNotNull(flowsInBucket); |
| assertEquals(2, flowsInBucket.size()); |
| flowsInBucket.stream().forEach(f -> LOGGER.info("Flow in bucket, flow id " + f.getIdentifier())); |
| |
| // ---------------------- TEST SNAPSHOTS --------------------------// |
| |
| final FlowSnapshotClient snapshotClient = client.getFlowSnapshotClient(); |
| |
| // create snapshots |
| final VersionedFlow snapshotFlow = flow1; |
| |
| final VersionedFlowSnapshot snapshot1 = createSnapshot(snapshotClient, snapshotFlow, 1); |
| LOGGER.info("Created snapshot # 1 with version " + snapshot1.getSnapshotMetadata().getVersion()); |
| |
| final VersionedFlowSnapshot snapshot2 = createSnapshot(snapshotClient, snapshotFlow, 2); |
| LOGGER.info("Created snapshot # 2 with version " + snapshot2.getSnapshotMetadata().getVersion()); |
| |
| // get snapshot |
| final VersionedFlowSnapshot retrievedSnapshot1 = snapshotClient.get(snapshotFlow.getBucketIdentifier(), snapshotFlow.getIdentifier(), 1); |
| assertNotNull(retrievedSnapshot1); |
| assertFalse(retrievedSnapshot1.isLatest()); |
| LOGGER.info("Retrieved snapshot # 1 with version " + retrievedSnapshot1.getSnapshotMetadata().getVersion()); |
| |
| final VersionedFlowSnapshot retrievedSnapshot2 = snapshotClient.get(snapshotFlow.getBucketIdentifier(), snapshotFlow.getIdentifier(), 2); |
| assertNotNull(retrievedSnapshot2); |
| assertTrue(retrievedSnapshot2.isLatest()); |
| LOGGER.info("Retrieved snapshot # 2 with version " + retrievedSnapshot2.getSnapshotMetadata().getVersion()); |
| |
| // get snapshot without bucket |
| final VersionedFlowSnapshot retrievedSnapshot1WithoutBucket = snapshotClient.get(snapshotFlow.getIdentifier(), 1); |
| assertNotNull(retrievedSnapshot1WithoutBucket); |
| assertFalse(retrievedSnapshot1WithoutBucket.isLatest()); |
| assertEquals(snapshotFlow.getIdentifier(), retrievedSnapshot1WithoutBucket.getSnapshotMetadata().getFlowIdentifier()); |
| assertEquals(1, retrievedSnapshot1WithoutBucket.getSnapshotMetadata().getVersion()); |
| LOGGER.info("Retrieved snapshot # 1 without using bucket id, with version " + retrievedSnapshot1WithoutBucket.getSnapshotMetadata().getVersion()); |
| |
| // get latest |
| final VersionedFlowSnapshot retrievedSnapshotLatest = snapshotClient.getLatest(snapshotFlow.getBucketIdentifier(), snapshotFlow.getIdentifier()); |
| assertNotNull(retrievedSnapshotLatest); |
| assertEquals(snapshot2.getSnapshotMetadata().getVersion(), retrievedSnapshotLatest.getSnapshotMetadata().getVersion()); |
| assertTrue(retrievedSnapshotLatest.isLatest()); |
| LOGGER.info("Retrieved latest snapshot with version " + retrievedSnapshotLatest.getSnapshotMetadata().getVersion()); |
| |
| // get latest without bucket |
| final VersionedFlowSnapshot retrievedSnapshotLatestWithoutBucket = snapshotClient.getLatest(snapshotFlow.getIdentifier()); |
| assertNotNull(retrievedSnapshotLatestWithoutBucket); |
| assertEquals(snapshot2.getSnapshotMetadata().getVersion(), retrievedSnapshotLatestWithoutBucket.getSnapshotMetadata().getVersion()); |
| assertTrue(retrievedSnapshotLatestWithoutBucket.isLatest()); |
| LOGGER.info("Retrieved latest snapshot without bucket, with version " + retrievedSnapshotLatestWithoutBucket.getSnapshotMetadata().getVersion()); |
| |
| // get metadata |
| final List<VersionedFlowSnapshotMetadata> retrievedMetadata = snapshotClient.getSnapshotMetadata(snapshotFlow.getBucketIdentifier(), snapshotFlow.getIdentifier()); |
| assertNotNull(retrievedMetadata); |
| assertEquals(2, retrievedMetadata.size()); |
| assertEquals(2, retrievedMetadata.get(0).getVersion()); |
| assertEquals(1, retrievedMetadata.get(1).getVersion()); |
| retrievedMetadata.stream().forEach(s -> LOGGER.info("Retrieved snapshot metadata " + s.getVersion())); |
| |
| // get metadata without bucket |
| final List<VersionedFlowSnapshotMetadata> retrievedMetadataWithoutBucket = snapshotClient.getSnapshotMetadata(snapshotFlow.getIdentifier()); |
| assertNotNull(retrievedMetadataWithoutBucket); |
| assertEquals(2, retrievedMetadataWithoutBucket.size()); |
| assertEquals(2, retrievedMetadataWithoutBucket.get(0).getVersion()); |
| assertEquals(1, retrievedMetadataWithoutBucket.get(1).getVersion()); |
| retrievedMetadataWithoutBucket.stream().forEach(s -> LOGGER.info("Retrieved snapshot metadata " + s.getVersion())); |
| |
| // get latest metadata |
| final VersionedFlowSnapshotMetadata latestMetadata = snapshotClient.getLatestMetadata(snapshotFlow.getBucketIdentifier(), snapshotFlow.getIdentifier()); |
| assertNotNull(latestMetadata); |
| assertEquals(2, latestMetadata.getVersion()); |
| |
| // get latest metadata that doesn't exist |
| try { |
| snapshotClient.getLatestMetadata(snapshotFlow.getBucketIdentifier(), "DOES-NOT-EXIST"); |
| fail("Should have thrown exception"); |
| } catch (NiFiRegistryException nfe) { |
| assertEquals("Error retrieving latest snapshot metadata: The specified flow ID does not exist in this bucket.", nfe.getMessage()); |
| } |
| |
| // get latest metadata without bucket |
| final VersionedFlowSnapshotMetadata latestMetadataWithoutBucket = snapshotClient.getLatestMetadata(snapshotFlow.getIdentifier()); |
| assertNotNull(latestMetadataWithoutBucket); |
| assertEquals(snapshotFlow.getIdentifier(), latestMetadataWithoutBucket.getFlowIdentifier()); |
| assertEquals(2, latestMetadataWithoutBucket.getVersion()); |
| |
| // ---------------------- TEST EXTENSIONS ----------------------// |
| |
| // verify we have no bundles yet |
| final BundleClient bundleClient = client.getBundleClient(); |
| final List<Bundle> allBundles = bundleClient.getAll(); |
| assertEquals(0, allBundles.size()); |
| |
| final Bucket bundlesBucket = createdBuckets.get(1); |
| final Bucket bundlesBucket2 = createdBuckets.get(2); |
| final BundleVersionClient bundleVersionClient = client.getBundleVersionClient(); |
| |
| // create version 1.0.0 of nifi-test-nar |
| final String testNar1 = "src/test/resources/extensions/nars/nifi-test-nar-1.0.0.nar"; |
| final BundleVersion createdTestNarV1 = createExtensionBundleVersionWithStream(bundlesBucket, bundleVersionClient, testNar1, null); |
| |
| final Bundle testNarV1Bundle = createdTestNarV1.getBundle(); |
| LOGGER.info("Created bundle with id {}", new Object[]{testNarV1Bundle.getIdentifier()}); |
| |
| assertEquals("org.apache.nifi", testNarV1Bundle.getGroupId()); |
| assertEquals("nifi-test-nar", testNarV1Bundle.getArtifactId()); |
| assertEquals(BundleType.NIFI_NAR, testNarV1Bundle.getBundleType()); |
| assertEquals(1, testNarV1Bundle.getVersionCount()); |
| |
| assertEquals("org.apache.nifi:nifi-test-nar", testNarV1Bundle.getName()); |
| assertEquals(bundlesBucket.getIdentifier(), testNarV1Bundle.getBucketIdentifier()); |
| assertEquals(bundlesBucket.getName(), testNarV1Bundle.getBucketName()); |
| assertNotNull(testNarV1Bundle.getPermissions()); |
| assertTrue(testNarV1Bundle.getCreatedTimestamp() > 0); |
| assertTrue(testNarV1Bundle.getModifiedTimestamp() > 0); |
| |
| final BundleVersionMetadata testNarV1Metadata = createdTestNarV1.getVersionMetadata(); |
| assertEquals("1.0.0", testNarV1Metadata.getVersion()); |
| assertNotNull(testNarV1Metadata.getId()); |
| assertNotNull(testNarV1Metadata.getSha256()); |
| assertNotNull(testNarV1Metadata.getAuthor()); |
| assertEquals(testNarV1Bundle.getIdentifier(), testNarV1Metadata.getBundleId()); |
| assertEquals(bundlesBucket.getIdentifier(), testNarV1Metadata.getBucketId()); |
| assertTrue(testNarV1Metadata.getTimestamp() > 0); |
| assertFalse(testNarV1Metadata.getSha256Supplied()); |
| assertTrue(testNarV1Metadata.getContentSize() > 1); |
| |
| final BuildInfo testNarV1BuildInfo = testNarV1Metadata.getBuildInfo(); |
| assertNotNull(testNarV1BuildInfo); |
| assertTrue(testNarV1BuildInfo.getBuilt() > 0); |
| assertNotNull(testNarV1BuildInfo.getBuildTool()); |
| assertNotNull(testNarV1BuildInfo.getBuildRevision()); |
| |
| final Set<BundleVersionDependency> dependencies = createdTestNarV1.getDependencies(); |
| assertNotNull(dependencies); |
| assertEquals(1, dependencies.size()); |
| |
| final BundleVersionDependency testNarV1Dependency = dependencies.stream().findFirst().get(); |
| assertEquals("org.apache.nifi", testNarV1Dependency.getGroupId()); |
| assertEquals("nifi-test-api-nar", testNarV1Dependency.getArtifactId()); |
| assertEquals("1.0.0", testNarV1Dependency.getVersion()); |
| |
| final String testNar2 = "src/test/resources/extensions/nars/nifi-test-nar-2.0.0.nar"; |
| |
| // try to create version 2.0.0 of nifi-test-nar when the supplied SHA-256 does not match server's |
| final String madeUpSha256 = "MADE-UP-SHA-256"; |
| try { |
| createExtensionBundleVersionWithStream(bundlesBucket, bundleVersionClient, testNar2, madeUpSha256); |
| fail("Should have thrown exception"); |
| } catch (Exception e) { |
| // should have thrown exception from mismatched SHA-256 |
| } |
| |
| // create version 2.0.0 of nifi-test-nar using correct supplied SHA-256 |
| final String testNar2Sha256 = calculateSha256Hex(testNar2); |
| final BundleVersion createdTestNarV2 = createExtensionBundleVersionWithStream(bundlesBucket, bundleVersionClient, testNar2, testNar2Sha256); |
| assertTrue(createdTestNarV2.getVersionMetadata().getSha256Supplied()); |
| |
| final Bundle testNarV2Bundle = createdTestNarV2.getBundle(); |
| LOGGER.info("Created bundle with id {}", new Object[]{testNarV2Bundle.getIdentifier()}); |
| |
| // create version 1.0.0 of nifi-foo-nar, use the file variant |
| final String fooNar = "src/test/resources/extensions/nars/nifi-foo-nar-1.0.0.nar"; |
| final BundleVersion createdFooNarV1 = createExtensionBundleVersionWithFile(bundlesBucket, bundleVersionClient, fooNar, null); |
| assertFalse(createdFooNarV1.getVersionMetadata().getSha256Supplied()); |
| |
| final Bundle fooNarV1Bundle = createdFooNarV1.getBundle(); |
| LOGGER.info("Created bundle with id {}", new Object[]{fooNarV1Bundle.getIdentifier()}); |
| |
| // verify that bucket 1 currently does not allow redeploying non-snapshot artifacts |
| assertFalse(bundlesBucket.isAllowBundleRedeploy()); |
| |
| // try to re-deploy version 1.0.0 of nifi-foo-nar, should fail |
| try { |
| createExtensionBundleVersionWithFile(bundlesBucket, bundleVersionClient, fooNar, null); |
| fail("Should have thrown exception when re-deploying foo nar"); |
| } catch (Exception e) { |
| // Should throw exception |
| } |
| |
| // now update bucket 1 to allow redeploy |
| bundlesBucket.setAllowBundleRedeploy(true); |
| final Bucket updatedBundlesBucket = bucketClient.update(bundlesBucket); |
| assertTrue(updatedBundlesBucket.isAllowBundleRedeploy()); |
| |
| // try to re-deploy version 1.0.0 of nifi-foo-nar again, this time should work |
| assertNotNull(createExtensionBundleVersionWithFile(bundlesBucket, bundleVersionClient, fooNar, null)); |
| |
| // verify there are 2 bundles now |
| final List<Bundle> allBundlesAfterCreate = bundleClient.getAll(); |
| assertEquals(2, allBundlesAfterCreate.size()); |
| |
| // create version 2.0.0-SNAPSHOT (build 1 content) of nifi-foor-nar in the first bucket |
| final String fooNarV2SnapshotB1 = "src/test/resources/extensions/nars/nifi-foo-nar-2.0.0-SNAPSHOT-BUILD1.nar"; |
| final BundleVersion createdFooNarV2SnapshotB1 = createExtensionBundleVersionWithFile(bundlesBucket, bundleVersionClient, fooNarV2SnapshotB1, null); |
| assertFalse(createdFooNarV2SnapshotB1.getVersionMetadata().getSha256Supplied()); |
| |
| // create version 2.0.0-SNAPSHOT (build 2 content) of nifi-foor-nar in the second bucket |
| // proves that snapshots can have different checksums across buckets, non-snapshots can't |
| final String fooNarV2SnapshotB2 = "src/test/resources/extensions/nars/nifi-foo-nar-2.0.0-SNAPSHOT-BUILD2.nar"; |
| final BundleVersion createdFooNarV2SnapshotB2 = createExtensionBundleVersionWithFile(bundlesBucket2, bundleVersionClient, fooNarV2SnapshotB2, null); |
| assertFalse(createdFooNarV2SnapshotB2.getVersionMetadata().getSha256Supplied()); |
| |
| // create version 2.0.0-SNAPSHOT (build 2 content) of nifi-foor-nar in the second bucket |
| // proves that we can overwrite a snapshot in a given bucket |
| final String fooNarV2SnapshotB3 = "src/test/resources/extensions/nars/nifi-foo-nar-2.0.0-SNAPSHOT-BUILD3.nar"; |
| final BundleVersion createdFooNarV2SnapshotB3 = createExtensionBundleVersionWithFile(bundlesBucket2, bundleVersionClient, fooNarV2SnapshotB3, null); |
| assertFalse(createdFooNarV2SnapshotB3.getVersionMetadata().getSha256Supplied()); |
| |
| // verify retrieving nifi-foo-nar 2.0.0-SNAPSHOT from second bucket returns the build 3 content |
| final BundleVersion retrievedFooNarV2SnapshotB3 = bundleVersionClient.getBundleVersion( |
| createdFooNarV2SnapshotB3.getVersionMetadata().getBundleId(), |
| createdFooNarV2SnapshotB3.getVersionMetadata().getVersion()); |
| assertEquals(calculateSha256Hex(fooNarV2SnapshotB3), retrievedFooNarV2SnapshotB3.getVersionMetadata().getSha256()); |
| |
| final List<ExtensionMetadata> fooNarV2SnapshotB3Extensions = bundleVersionClient.getExtensions( |
| createdFooNarV2SnapshotB3.getVersionMetadata().getBundleId(), |
| createdFooNarV2SnapshotB3.getVersionMetadata().getVersion()); |
| assertNotNull(fooNarV2SnapshotB3Extensions); |
| assertEquals(1, fooNarV2SnapshotB3Extensions.size()); |
| checkExtensionMetadata(fooNarV2SnapshotB3Extensions); |
| |
| // verify getting an extension for a specific bundle version |
| final String fooNarV2SnapshotB3ExtensionName = fooNarV2SnapshotB3Extensions.get(0).getName(); |
| final Extension fooNarV2SnapshotB3Extension = bundleVersionClient.getExtension( |
| createdFooNarV2SnapshotB3.getVersionMetadata().getBundleId(), |
| createdFooNarV2SnapshotB3.getVersionMetadata().getVersion(), |
| fooNarV2SnapshotB3ExtensionName |
| ); |
| assertNotNull(fooNarV2SnapshotB3Extension); |
| assertEquals(fooNarV2SnapshotB3ExtensionName, fooNarV2SnapshotB3Extension.getName()); |
| |
| // verify getting the docs for an extension for a specific bundle version |
| try (final InputStream docsInput = bundleVersionClient.getExtensionDocs( |
| createdFooNarV2SnapshotB3.getVersionMetadata().getBundleId(), |
| createdFooNarV2SnapshotB3.getVersionMetadata().getVersion(), |
| fooNarV2SnapshotB3ExtensionName |
| )) { |
| final String docsContent = IOUtils.toString(docsInput, StandardCharsets.UTF_8); |
| assertNotNull(docsContent); |
| assertTrue(docsContent.startsWith("<!DOCTYPE html>")); |
| } |
| |
| // verify getting bundles by bucket |
| assertEquals(2, bundleClient.getByBucket(bundlesBucket.getIdentifier()).size()); |
| assertEquals(0, bundleClient.getByBucket(flowsBucket.getIdentifier()).size()); |
| |
| // verify getting bundles by id |
| final Bundle retrievedBundle = bundleClient.get(testNarV1Bundle.getIdentifier()); |
| assertNotNull(retrievedBundle); |
| assertEquals(testNarV1Bundle.getIdentifier(), retrievedBundle.getIdentifier()); |
| assertEquals(testNarV1Bundle.getGroupId(), retrievedBundle.getGroupId()); |
| assertEquals(testNarV1Bundle.getArtifactId(), retrievedBundle.getArtifactId()); |
| |
| // verify getting list of version metadata for a bundle |
| final List<BundleVersionMetadata> bundleVersions = bundleVersionClient.getBundleVersions(testNarV1Bundle.getIdentifier()); |
| assertNotNull(bundleVersions); |
| assertEquals(2, bundleVersions.size()); |
| |
| // verify getting a bundle version by the bundle id + version string |
| final BundleVersion bundleVersion1 = bundleVersionClient.getBundleVersion(testNarV1Bundle.getIdentifier(), "1.0.0"); |
| assertNotNull(bundleVersion1); |
| assertEquals("1.0.0", bundleVersion1.getVersionMetadata().getVersion()); |
| assertNotNull(bundleVersion1.getDependencies()); |
| assertEquals(1, bundleVersion1.getDependencies().size()); |
| |
| final BundleVersion bundleVersion2 = bundleVersionClient.getBundleVersion(testNarV1Bundle.getIdentifier(), "2.0.0"); |
| assertNotNull(bundleVersion2); |
| assertEquals("2.0.0", bundleVersion2.getVersionMetadata().getVersion()); |
| |
| // verify getting the input stream for a bundle version |
| try (final InputStream bundleVersion1InputStream = bundleVersionClient.getBundleVersionContent(testNarV1Bundle.getIdentifier(), "1.0.0")) { |
| final String sha256Hex = DigestUtils.sha256Hex(bundleVersion1InputStream); |
| assertEquals(testNarV1Metadata.getSha256(), sha256Hex); |
| } |
| |
| // verify writing a bundle version to an output stream |
| final File targetDir = new File("./target"); |
| final File bundleFile = bundleVersionClient.writeBundleVersionContent(testNarV1Bundle.getIdentifier(), "1.0.0", targetDir); |
| assertNotNull(bundleFile); |
| |
| try (final InputStream bundleInputStream = new FileInputStream(bundleFile)) { |
| final String sha256Hex = DigestUtils.sha256Hex(bundleInputStream); |
| assertEquals(testNarV1Metadata.getSha256(), sha256Hex); |
| } |
| |
| // Verify deleting a bundle version |
| final BundleVersion deletedBundleVersion2 = bundleVersionClient.delete(testNarV1Bundle.getIdentifier(), "2.0.0"); |
| assertNotNull(deletedBundleVersion2); |
| assertEquals(testNarV1Bundle.getIdentifier(), deletedBundleVersion2.getBundle().getIdentifier()); |
| assertEquals("2.0.0", deletedBundleVersion2.getVersionMetadata().getVersion()); |
| |
| try { |
| bundleVersionClient.getBundleVersion(testNarV1Bundle.getIdentifier(), "2.0.0"); |
| fail("Should have thrown exception"); |
| } catch (Exception e) { |
| // should catch exception |
| } |
| |
| // Verify getting bundles with filter params |
| assertEquals(3, bundleClient.getAll(BundleFilterParams.empty()).size()); |
| |
| final List<Bundle> filteredBundles = bundleClient.getAll(BundleFilterParams.of("org.apache.nifi", "nifi-test-nar")); |
| assertEquals(1, filteredBundles.size()); |
| |
| // Verify getting bundle versions with filter params |
| assertEquals(4, bundleVersionClient.getBundleVersions(BundleVersionFilterParams.empty()).size()); |
| |
| final List<BundleVersionMetadata> filteredVersions = bundleVersionClient.getBundleVersions( |
| BundleVersionFilterParams.of("org.apache.nifi", "nifi-foo-nar", "1.0.0")); |
| assertEquals(1, filteredVersions.size()); |
| |
| final List<BundleVersionMetadata> filteredVersions2 = bundleVersionClient.getBundleVersions( |
| BundleVersionFilterParams.of("org.apache.nifi", null, null)); |
| assertEquals(4, filteredVersions2.size()); |
| |
| // ---------------------- TEST EXTENSION REPO ----------------------// |
| |
| final ExtensionRepoClient extensionRepoClient = client.getExtensionRepoClient(); |
| |
| final List<ExtensionRepoBucket> repoBuckets = extensionRepoClient.getBuckets(); |
| assertEquals(createdBuckets.size(), repoBuckets.size()); |
| |
| final String bundlesBucketName = bundlesBucket.getName(); |
| final List<ExtensionRepoGroup> repoGroups = extensionRepoClient.getGroups(bundlesBucketName); |
| assertEquals(1, repoGroups.size()); |
| |
| final String repoGroupId = "org.apache.nifi"; |
| final ExtensionRepoGroup repoGroup = repoGroups.get(0); |
| assertEquals(repoGroupId, repoGroup.getGroupId()); |
| |
| final List<ExtensionRepoArtifact> repoArtifacts = extensionRepoClient.getArtifacts(bundlesBucketName, repoGroupId); |
| assertEquals(2, repoArtifacts.size()); |
| |
| final String repoArtifactId = "nifi-test-nar"; |
| final List<ExtensionRepoVersionSummary> repoVersions = extensionRepoClient.getVersions(bundlesBucketName, repoGroupId, repoArtifactId); |
| assertEquals(1, repoVersions.size()); |
| |
| final String repoVersionString = "1.0.0"; |
| final ExtensionRepoVersion repoVersion = extensionRepoClient.getVersion(bundlesBucketName, repoGroupId, repoArtifactId, repoVersionString); |
| assertNotNull(repoVersion); |
| assertNotNull(repoVersion.getDownloadLink()); |
| assertNotNull(repoVersion.getSha256Link()); |
| assertNotNull(repoVersion.getExtensionsLink()); |
| |
| // verify the version links for content and sha256 |
| final Client jerseyClient = ClientBuilder.newBuilder().register(MultiPartFeature.class).build(); |
| |
| final WebTarget downloadLinkTarget = jerseyClient.target(repoVersion.getDownloadLink().getUri()); |
| try (final InputStream downloadLinkInputStream = downloadLinkTarget.request() |
| .accept(MediaType.APPLICATION_OCTET_STREAM_TYPE).get().readEntity(InputStream.class)) { |
| final String sha256DownloadResult = DigestUtils.sha256Hex(downloadLinkInputStream); |
| |
| final WebTarget sha256LinkTarget = jerseyClient.target(repoVersion.getSha256Link().getUri()); |
| final String sha256LinkResult = sha256LinkTarget.request().get(String.class); |
| assertEquals(sha256DownloadResult, sha256LinkResult); |
| } |
| |
| // verify the version link for extension metadata works |
| final WebTarget extensionsLinkTarget = jerseyClient.target(repoVersion.getExtensionsLink().getUri()); |
| final ExtensionRepoExtensionMetadata[] extensions = extensionsLinkTarget.request() |
| .accept(MediaType.APPLICATION_JSON) |
| .get(ExtensionRepoExtensionMetadata[].class); |
| assertNotNull(extensions); |
| assertTrue(extensions.length > 0); |
| checkExtensionMetadata(Stream.of(extensions).map(e -> e.getExtensionMetadata()).collect(Collectors.toSet())); |
| Stream.of(extensions).forEach(e -> { |
| assertNotNull(e.getLink()); |
| assertNotNull(e.getLinkDocs()); |
| }); |
| |
| // verify the client methods for content input stream, content sha256, and extensions |
| try (final InputStream repoVersionInputStream = extensionRepoClient.getVersionContent(bundlesBucketName, repoGroupId, repoArtifactId, repoVersionString)) { |
| final String sha256Hex = DigestUtils.sha256Hex(repoVersionInputStream); |
| |
| final String repoSha256Hex = extensionRepoClient.getVersionSha256(bundlesBucketName, repoGroupId, repoArtifactId, repoVersionString); |
| assertEquals(sha256Hex, repoSha256Hex); |
| |
| final Optional<String> repoSha256HexOptional = extensionRepoClient.getVersionSha256(repoGroupId, repoArtifactId, repoVersionString); |
| assertTrue(repoSha256HexOptional.isPresent()); |
| assertEquals(sha256Hex, repoSha256HexOptional.get()); |
| |
| final List<ExtensionRepoExtensionMetadata> extensionList = extensionRepoClient.getVersionExtensions(bundlesBucketName, repoGroupId, repoArtifactId, repoVersionString); |
| assertNotNull(extensionList); |
| assertTrue(extensionList.size() > 0); |
| extensionList.forEach(em -> { |
| assertNotNull(em.getExtensionMetadata()); |
| assertNotNull(em.getLink()); |
| assertNotNull(em.getLinkDocs()); |
| }); |
| |
| final String extensionName = extensionList.get(0).getExtensionMetadata().getName(); |
| final Extension extension = extensionRepoClient.getVersionExtension( |
| bundlesBucketName, repoGroupId, repoArtifactId, repoVersionString, extensionName); |
| assertNotNull(extension); |
| assertEquals(extensionName, extension.getName()); |
| |
| // verify getting the docs for an extension from extension repo |
| try (final InputStream docsInput = extensionRepoClient.getVersionExtensionDocs( |
| bundlesBucketName, repoGroupId, repoArtifactId, repoVersionString, extensionName) |
| ) { |
| final String docsContent = IOUtils.toString(docsInput, StandardCharsets.UTF_8); |
| assertNotNull(docsContent); |
| assertTrue(docsContent.startsWith("<!DOCTYPE html>")); |
| } |
| } |
| |
| final Optional<String> repoSha256HexDoesNotExist = extensionRepoClient.getVersionSha256(repoGroupId, repoArtifactId, "DOES-NOT-EXIST"); |
| assertFalse(repoSha256HexDoesNotExist.isPresent()); |
| |
| // since we uploaded two snapshot versions, make sure when we retrieve the sha that it's for the second snapshot that replaced the first |
| final Optional<String> fooNarV2SnapshotLatestSha = extensionRepoClient.getVersionSha256( |
| createdFooNarV2SnapshotB3.getBundle().getGroupId(), |
| createdFooNarV2SnapshotB3.getBundle().getArtifactId(), |
| createdFooNarV2SnapshotB2.getVersionMetadata().getVersion()); |
| assertTrue(fooNarV2SnapshotLatestSha.isPresent()); |
| assertEquals(calculateSha256Hex(fooNarV2SnapshotB3), fooNarV2SnapshotLatestSha.get()); |
| |
| // ---------------------- TEST EXTENSIONS -------------------------- // |
| |
| final ExtensionClient extensionClient = client.getExtensionClient(); |
| |
| final List<TagCount> tagCounts = extensionClient.getTagCounts(); |
| assertNotNull(tagCounts); |
| assertTrue(tagCounts.size() > 0); |
| tagCounts.forEach(tc -> { |
| assertNotNull(tc.getTag()); |
| }); |
| |
| final ExtensionMetadataContainer allExtensions = extensionClient.findExtensions((ExtensionFilterParams)null); |
| assertNotNull(allExtensions); |
| assertNotNull(allExtensions.getExtensions()); |
| assertEquals(4, allExtensions.getNumResults()); |
| assertEquals(4, allExtensions.getExtensions().size()); |
| |
| allExtensions.getExtensions().forEach(e -> { |
| assertNotNull(e.getName()); |
| assertNotNull(e.getDisplayName()); |
| assertNotNull(e.getLink()); |
| assertNotNull(e.getLinkDocs()); |
| }); |
| |
| final ExtensionMetadataContainer processorExtensions = extensionClient.findExtensions( |
| new ExtensionFilterParams.Builder().extensionType(ExtensionType.PROCESSOR).build()); |
| assertNotNull(processorExtensions); |
| assertNotNull(processorExtensions.getExtensions()); |
| assertEquals(3, processorExtensions.getNumResults()); |
| assertEquals(3, processorExtensions.getExtensions().size()); |
| |
| final ExtensionMetadataContainer serviceExtensions = extensionClient.findExtensions( |
| new ExtensionFilterParams.Builder().extensionType(ExtensionType.CONTROLLER_SERVICE).build()); |
| assertNotNull(serviceExtensions); |
| assertNotNull(serviceExtensions.getExtensions()); |
| assertEquals(1, serviceExtensions.getNumResults()); |
| assertEquals(1, serviceExtensions.getExtensions().size()); |
| |
| final ProvidedServiceAPI serviceAPI = new ProvidedServiceAPI(); |
| serviceAPI.setClassName("org.apache.nifi.service.TestService"); |
| serviceAPI.setGroupId("org.apache.nifi"); |
| serviceAPI.setArtifactId("nifi-test-service-api-nar"); |
| serviceAPI.setVersion("1.0.0"); |
| |
| final ExtensionMetadataContainer providedTestServiceApi = extensionClient.findExtensions(serviceAPI); |
| assertNotNull(providedTestServiceApi); |
| assertNotNull(providedTestServiceApi.getExtensions()); |
| assertEquals(1, providedTestServiceApi.getNumResults()); |
| assertEquals(1, providedTestServiceApi.getExtensions().size()); |
| assertEquals("org.apache.nifi.service.TestServiceImpl", providedTestServiceApi.getExtensions().first().getName()); |
| |
| // ---------------------- TEST ITEMS -------------------------- // |
| |
| final ItemsClient itemsClient = client.getItemsClient(); |
| |
| // get fields |
| final Fields itemFields = itemsClient.getFields(); |
| assertNotNull(itemFields.getFields()); |
| assertTrue(itemFields.getFields().size() > 0); |
| |
| // get all items |
| final List<BucketItem> allItems = itemsClient.getAll(); |
| assertEquals(5, allItems.size()); |
| allItems.stream().forEach(i -> { |
| assertNotNull(i.getBucketName()); |
| assertNotNull(i.getLink()); |
| }); |
| allItems.stream().forEach(i -> LOGGER.info("All items, item " + i.getIdentifier())); |
| |
| // verify 2 flow items |
| final List<BucketItem> flowItems = allItems.stream() |
| .filter(i -> i.getType() == BucketItemType.Flow) |
| .collect(Collectors.toList()); |
| assertEquals(2, flowItems.size()); |
| |
| // verify 3 bundle items |
| final List<BucketItem> extensionBundleItems = allItems.stream() |
| .filter(i -> i.getType() == BucketItemType.Bundle) |
| .collect(Collectors.toList()); |
| assertEquals(3, extensionBundleItems.size()); |
| |
| // get items for bucket |
| final List<BucketItem> bucketItems = itemsClient.getByBucket(flowsBucket.getIdentifier()); |
| assertEquals(2, bucketItems.size()); |
| allItems.stream().forEach(i -> assertNotNull(i.getBucketName())); |
| bucketItems.stream().forEach(i -> LOGGER.info("Items in bucket, item " + i.getIdentifier())); |
| |
| // ----------------------- TEST DIFF ---------------------------// |
| |
| final VersionedFlowSnapshot snapshot3 = buildSnapshot(snapshotFlow, 3); |
| final VersionedProcessGroup newlyAddedPG = new VersionedProcessGroup(); |
| newlyAddedPG.setIdentifier("new-pg"); |
| newlyAddedPG.setName("NEW Process Group"); |
| snapshot3.getFlowContents().getProcessGroups().add(newlyAddedPG); |
| snapshotClient.create(snapshot3); |
| |
| VersionedFlowDifference diff = flowClient.diff(snapshotFlow.getBucketIdentifier(), snapshotFlow.getIdentifier(), 3, 2); |
| assertNotNull(diff); |
| assertEquals(1, diff.getComponentDifferenceGroups().size()); |
| |
| // ---------------------- DELETE DATA --------------------------// |
| |
| final VersionedFlow deletedFlow1 = flowClient.delete(flowsBucket.getIdentifier(), flow1.getIdentifier()); |
| assertNotNull(deletedFlow1); |
| LOGGER.info("Deleted flow " + deletedFlow1.getIdentifier()); |
| |
| final VersionedFlow deletedFlow2 = flowClient.delete(flowsBucket.getIdentifier(), flow2.getIdentifier()); |
| assertNotNull(deletedFlow2); |
| LOGGER.info("Deleted flow " + deletedFlow2.getIdentifier()); |
| |
| final Bundle deletedBundle1 = bundleClient.delete(testNarV1Bundle.getIdentifier()); |
| assertNotNull(deletedBundle1); |
| LOGGER.info("Deleted extension bundle " + deletedBundle1.getIdentifier()); |
| |
| final Bundle deletedBundle2 = bundleClient.delete(fooNarV1Bundle.getIdentifier()); |
| assertNotNull(deletedBundle2); |
| LOGGER.info("Deleted extension bundle " + deletedBundle2.getIdentifier()); |
| |
| // delete each bucket |
| for (final Bucket bucket : createdBuckets) { |
| final Bucket deletedBucket = bucketClient.delete(bucket.getIdentifier()); |
| assertNotNull(deletedBucket); |
| LOGGER.info("Deleted bucket " + deletedBucket.getIdentifier()); |
| } |
| assertEquals(0, bucketClient.getAll().size()); |
| |
| LOGGER.info("!!! SUCCESS !!!"); |
| |
| } |
| |
| private void checkExtensionMetadata(Collection<ExtensionMetadata> extensions) { |
| extensions.forEach(e -> { |
| assertNotNull(e.getBundleInfo()); |
| assertNotNull(e.getBundleInfo().getBucketId()); |
| assertNotNull(e.getBundleInfo().getBucketName()); |
| assertNotNull(e.getBundleInfo().getBundleId()); |
| assertNotNull(e.getBundleInfo().getGroupId()); |
| assertNotNull(e.getBundleInfo().getArtifactId()); |
| assertNotNull(e.getBundleInfo().getVersion()); |
| }); |
| } |
| |
| private BundleVersion createExtensionBundleVersionWithStream(final Bucket bundlesBucket, |
| final BundleVersionClient bundleVersionClient, |
| final String narFile, final String sha256) |
| throws IOException, NiFiRegistryException { |
| |
| final BundleVersion createdBundleVersion; |
| try (final InputStream bundleInputStream = new FileInputStream(narFile)) { |
| if (StringUtils.isBlank(sha256)) { |
| createdBundleVersion = bundleVersionClient.create( |
| bundlesBucket.getIdentifier(), BundleType.NIFI_NAR, bundleInputStream); |
| } else { |
| createdBundleVersion = bundleVersionClient.create( |
| bundlesBucket.getIdentifier(), BundleType.NIFI_NAR, bundleInputStream, sha256); |
| } |
| } |
| |
| assertNotNull(createdBundleVersion); |
| assertNotNull(createdBundleVersion.getBucket()); |
| assertNotNull(createdBundleVersion.getBundle()); |
| assertNotNull(createdBundleVersion.getVersionMetadata()); |
| |
| return createdBundleVersion; |
| } |
| |
| private BundleVersion createExtensionBundleVersionWithFile(final Bucket bundlesBucket, |
| final BundleVersionClient bundleVersionClient, |
| final String narFile, final String sha256) |
| throws IOException, NiFiRegistryException { |
| |
| final BundleVersion createdBundleVersion; |
| if (StringUtils.isBlank(sha256)) { |
| createdBundleVersion = bundleVersionClient.create( |
| bundlesBucket.getIdentifier(), BundleType.NIFI_NAR, new File(narFile)); |
| } else { |
| createdBundleVersion = bundleVersionClient.create( |
| bundlesBucket.getIdentifier(), BundleType.NIFI_NAR, new File(narFile), sha256); |
| } |
| |
| assertNotNull(createdBundleVersion); |
| assertNotNull(createdBundleVersion.getBucket()); |
| assertNotNull(createdBundleVersion.getBundle()); |
| assertNotNull(createdBundleVersion.getVersionMetadata()); |
| |
| return createdBundleVersion; |
| } |
| |
| private String calculateSha256Hex(final String narFile) throws IOException { |
| try (final InputStream bundleInputStream = new FileInputStream(narFile)) { |
| return Hex.toHexString(DigestUtils.sha256(bundleInputStream)); |
| } |
| } |
| |
| private static Bucket createBucket(BucketClient bucketClient, int num) throws IOException, NiFiRegistryException { |
| final Bucket bucket = new Bucket(); |
| bucket.setName("Bucket #" + num); |
| bucket.setDescription("This is bucket #" + num); |
| return bucketClient.create(bucket); |
| } |
| |
| private static VersionedFlow createFlow(FlowClient client, Bucket bucket, int num) throws IOException, NiFiRegistryException { |
| final VersionedFlow versionedFlow = new VersionedFlow(); |
| versionedFlow.setName(bucket.getName() + " Flow #" + num); |
| versionedFlow.setDescription("This is " + bucket.getName() + " flow #" + num); |
| versionedFlow.setBucketIdentifier(bucket.getIdentifier()); |
| return client.create(versionedFlow); |
| } |
| |
| private static VersionedFlowSnapshot buildSnapshot(VersionedFlow flow, int num) { |
| final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); |
| snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier()); |
| snapshotMetadata.setFlowIdentifier(flow.getIdentifier()); |
| snapshotMetadata.setVersion(num); |
| snapshotMetadata.setComments("This is snapshot #" + num); |
| |
| final VersionedProcessGroup rootProcessGroup = new VersionedProcessGroup(); |
| rootProcessGroup.setIdentifier("root-pg"); |
| rootProcessGroup.setName("Root Process Group"); |
| |
| final VersionedProcessGroup subProcessGroup = new VersionedProcessGroup(); |
| subProcessGroup.setIdentifier("sub-pg"); |
| subProcessGroup.setName("Sub Process Group"); |
| rootProcessGroup.getProcessGroups().add(subProcessGroup); |
| |
| final Map<String,String> processorProperties = new HashMap<>(); |
| processorProperties.put("Prop 1", "Val 1"); |
| processorProperties.put("Prop 2", "Val 2"); |
| |
| final Map<String, VersionedPropertyDescriptor> propertyDescriptors = new HashMap<>(); |
| |
| final VersionedProcessor processor1 = new VersionedProcessor(); |
| processor1.setIdentifier("p1"); |
| processor1.setName("Processor 1"); |
| processor1.setProperties(processorProperties); |
| processor1.setPropertyDescriptors(propertyDescriptors); |
| |
| final VersionedProcessor processor2 = new VersionedProcessor(); |
| processor2.setIdentifier("p2"); |
| processor2.setName("Processor 2"); |
| processor2.setProperties(processorProperties); |
| processor2.setPropertyDescriptors(propertyDescriptors); |
| |
| subProcessGroup.getProcessors().add(processor1); |
| subProcessGroup.getProcessors().add(processor2); |
| |
| final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); |
| snapshot.setSnapshotMetadata(snapshotMetadata); |
| snapshot.setFlowContents(rootProcessGroup); |
| return snapshot; |
| } |
| |
| private static VersionedFlowSnapshot createSnapshot(FlowSnapshotClient client, VersionedFlow flow, int num) throws IOException, NiFiRegistryException { |
| final VersionedFlowSnapshot snapshot = buildSnapshot(flow, num); |
| |
| return client.create(snapshot); |
| } |
| } |