| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.registry.web.api; |
| |
| import org.apache.nifi.registry.bucket.BucketItemType; |
| import org.apache.nifi.registry.flow.VersionedFlow; |
| import org.apache.nifi.registry.flow.VersionedFlowSnapshot; |
| import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; |
| import org.apache.nifi.registry.flow.VersionedProcessGroup; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.skyscreamer.jsonassert.JSONAssert; |
| import org.springframework.test.annotation.IfProfileValue; |
| import org.springframework.test.context.jdbc.Sql; |
| |
| import javax.ws.rs.WebApplicationException; |
| import javax.ws.rs.client.Entity; |
| import javax.ws.rs.client.WebTarget; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| |
| import static org.apache.nifi.registry.web.api.IntegrationTestUtils.assertFlowSnapshotMetadataEqual; |
| import static org.apache.nifi.registry.web.api.IntegrationTestUtils.assertFlowSnapshotsEqual; |
| import static org.apache.nifi.registry.web.api.IntegrationTestUtils.assertFlowsEqual; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| @Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = {"classpath:db/clearDB.sql", "classpath:db/FlowsIT.sql"}) |
| public class FlowsIT extends UnsecuredITBase { |
| |
| @Test |
| public void testGetFlowsEmpty() throws Exception { |
| |
| // Given: an empty bucket with id "3" (see FlowsIT.sql) |
| final String emptyBucketId = "3"; |
| |
| // When: the /buckets/{id}/flows endpoint is queried |
| |
| final VersionedFlow[] flows = client |
| .target(createURL("buckets/{bucketId}/flows")) |
| .resolveTemplate("bucketId", emptyBucketId) |
| .request() |
| .get(VersionedFlow[].class); |
| |
| // Then: an empty array is returned |
| |
| assertNotNull(flows); |
| assertEquals(0, flows.length); |
| } |
| |
| // NOTE: The tests that seed the DB directly from SQL end up with different results for the timestamp depending on |
| // which DB is used, so for now these types of tests only run against H2. |
| @Test |
| @IfProfileValue(name="current.database.is.h2", value="true") |
| public void testGetFlows() throws Exception { |
| |
| // Given: a few buckets and flows have been populated in the DB (see FlowsIT.sql) |
| |
| final String prePopulatedBucketId = "1"; |
| final String expected = "[" + |
| "{\"identifier\":\"1\"," + |
| "\"name\":\"Flow 1\"," + |
| "\"description\":\"This is flow 1\"," + |
| "\"bucketIdentifier\":\"1\"," + |
| "\"createdTimestamp\":1505088000000," + |
| "\"modifiedTimestamp\":1505088000000," + |
| "\"type\":\"Flow\"," + |
| "\"permissions\":{\"canRead\":true,\"canWrite\":true,\"canDelete\":true}," + |
| "\"link\":{\"params\":{\"rel\":\"self\"},\"href\":\"buckets/1/flows/1\"}}," + |
| "{\"identifier\":\"2\",\"name\":\"Flow 2\"," + |
| "\"description\":\"This is flow 2\"," + |
| "\"bucketIdentifier\":\"1\"," + |
| "\"createdTimestamp\":1505088000000," + |
| "\"modifiedTimestamp\":1505088000000," + |
| "\"type\":\"Flow\"," + |
| "\"permissions\":{\"canRead\":true,\"canWrite\":true,\"canDelete\":true}," + |
| "\"versionCount\":0," + |
| "\"link\":{\"params\":{\"rel\":\"self\"},\"href\":\"buckets/1/flows/2\"}}" + |
| "]"; |
| |
| // When: the /buckets/{id}/flows endpoint is queried |
| |
| final String flowsJson = client |
| .target(createURL("buckets/{bucketId}/flows")) |
| .resolveTemplate("bucketId", prePopulatedBucketId) |
| .request() |
| .get(String.class); |
| |
| // Then: the pre-populated list of flows is returned |
| |
| JSONAssert.assertEquals(expected, flowsJson, false); |
| } |
| |
| @Test |
| public void testCreateFlowGetFlow() throws Exception { |
| |
| // Given: an empty bucket with id "3" (see FlowsIT.sql) |
| |
| long testStartTime = System.currentTimeMillis(); |
| final String bucketId = "3"; |
| |
| // When: a flow is created |
| |
| final VersionedFlow flow = new VersionedFlow(); |
| flow.setBucketIdentifier(bucketId); |
| flow.setName("Test Flow"); |
| flow.setDescription("This is a flow created by an integration test."); |
| |
| final VersionedFlow createdFlow = client |
| .target(createURL("buckets/{bucketId}/flows")) |
| .resolveTemplate("bucketId", bucketId) |
| .request() |
| .post(Entity.entity(flow, MediaType.APPLICATION_JSON), VersionedFlow.class); |
| |
| // Then: the server returns the created flow, with server-set fields populated correctly |
| |
| assertFlowsEqual(flow, createdFlow, false); |
| assertNotNull(createdFlow.getIdentifier()); |
| assertNotNull(createdFlow.getBucketName()); |
| assertEquals(0, createdFlow.getVersionCount()); |
| assertEquals(createdFlow.getType(), BucketItemType.Flow); |
| assertTrue(createdFlow.getCreatedTimestamp() - testStartTime > 0L); // both server and client in same JVM, so there shouldn't be skew |
| assertEquals(createdFlow.getCreatedTimestamp(), createdFlow.getModifiedTimestamp()); |
| assertNotNull(createdFlow.getLink()); |
| assertNotNull(createdFlow.getLink().getUri()); |
| |
| // And when .../flows is queried, then the newly created flow is returned in the list |
| |
| final VersionedFlow[] flows = client |
| .target(createURL("buckets/{bucketId}/flows")) |
| .resolveTemplate("bucketId", bucketId) |
| .request() |
| .get(VersionedFlow[].class); |
| assertNotNull(flows); |
| assertEquals(1, flows.length); |
| assertFlowsEqual(createdFlow, flows[0], true); |
| |
| // And when the link URI is queried, then the newly created flow is returned |
| |
| final VersionedFlow flowByLink = client |
| .target(createURL(flows[0].getLink().getUri().toString())) |
| .request() |
| .get(VersionedFlow.class); |
| assertFlowsEqual(createdFlow, flowByLink, true); |
| |
| // And when the bucket is queried by .../flows/ID, then the newly created flow is returned |
| |
| final VersionedFlow flowById = client |
| .target(createURL("buckets/{bucketId}/flows/{flowId}")) |
| .resolveTemplate("bucketId", bucketId) |
| .resolveTemplate("flowId", createdFlow.getIdentifier()) |
| .request() |
| .get(VersionedFlow.class); |
| assertFlowsEqual(createdFlow, flowById, true); |
| |
| } |
| |
| @Test |
| public void testUpdateFlow() throws Exception { |
| |
| // Given: a flow exists on the server |
| |
| final String bucketId = "3"; |
| final VersionedFlow flow = new VersionedFlow(); |
| flow.setBucketIdentifier(bucketId); |
| flow.setName("Test Flow"); |
| flow.setDescription("This is a flow created by an integration test."); |
| final VersionedFlow createdFlow = client |
| .target(createURL("buckets/{bucketId}/flows")) |
| .resolveTemplate("bucketId", bucketId) |
| .request() |
| .post(Entity.entity(flow, MediaType.APPLICATION_JSON), VersionedFlow.class); |
| |
| // When: the flow is modified by the client and updated on the server |
| |
| createdFlow.setName("Renamed Flow"); |
| createdFlow.setDescription("This flow has been updated by an integration test."); |
| |
| final VersionedFlow updatedFlow = client |
| .target(createURL("buckets/{bucketId}/flows/{flowId}")) |
| .resolveTemplate("bucketId", bucketId) |
| .resolveTemplate("flowId", createdFlow.getIdentifier()) |
| .request() |
| .put(Entity.entity(createdFlow, MediaType.APPLICATION_JSON), VersionedFlow.class); |
| |
| // Then: the server returns the updated flow, with a new modified timestamp |
| |
| assertTrue(updatedFlow.getModifiedTimestamp() > createdFlow.getModifiedTimestamp()); |
| createdFlow.setModifiedTimestamp(updatedFlow.getModifiedTimestamp()); |
| assertFlowsEqual(createdFlow, updatedFlow, true); |
| |
| } |
| |
| @Test |
| public void testDeleteBucket() throws Exception { |
| |
| // Given: a flow exists on the server |
| |
| final String bucketId = "3"; |
| final VersionedFlow flow = new VersionedFlow(); |
| flow.setBucketIdentifier(bucketId); |
| flow.setName("Test Flow"); |
| flow.setDescription("This is a flow created by an integration test."); |
| final VersionedFlow createdFlow = client |
| .target(createURL("buckets/{bucketId}/flows")) |
| .resolveTemplate("bucketId", bucketId) |
| .request() |
| .post(Entity.entity(flow, MediaType.APPLICATION_JSON), VersionedFlow.class); |
| |
| // When: the flow is deleted |
| |
| final VersionedFlow deletedFlow = client |
| .target(createURL("buckets/{bucketId}/flows/{flowId}")) |
| .resolveTemplate("bucketId", bucketId) |
| .resolveTemplate("flowId", createdFlow.getIdentifier()) |
| .request() |
| .delete(VersionedFlow.class); |
| |
| // Then: the body of the server response matches the flow that was deleted |
| // and: the flow is no longer accessible (resource not found) |
| |
| createdFlow.setLink(null); // self URI will not be present in deletedBucket |
| assertFlowsEqual(createdFlow, deletedFlow, true); |
| |
| final Response response = client |
| .target(createURL("buckets/{bucketId}/flows/{flowId}")) |
| .resolveTemplate("bucketId", bucketId) |
| .resolveTemplate("flowId", createdFlow.getIdentifier()) |
| .request() |
| .get(); |
| assertEquals(404, response.getStatus()); |
| |
| } |
| |
| @Test |
| public void testGetFlowVersionsEmpty() throws Exception { |
| |
| // Given: a Bucket "2" containing a flow "3" with no snapshots (see FlowsIT.sql) |
| final String bucketId = "2"; |
| final String flowId = "3"; |
| |
| // When: the /buckets/{id}/flows/{id}/versions endpoint is queried |
| |
| final VersionedFlowSnapshot[] flowSnapshots = client |
| .target(createURL("buckets/{bucketId}/flows/{flowId}/versions")) |
| .resolveTemplate("bucketId", bucketId) |
| .resolveTemplate("flowId", flowId) |
| .request() |
| .get(VersionedFlowSnapshot[].class); |
| |
| // Then: an empty array is returned |
| |
| assertNotNull(flowSnapshots); |
| assertEquals(0, flowSnapshots.length); |
| } |
| |
| // NOTE: The tests that seed the DB directly from SQL end up with different results for the timestamp depending on |
| // which DB is used, so for now these types of tests only run against H2. |
| @Test |
| @IfProfileValue(name="current.database.is.h2", value="true") |
| public void testGetFlowVersions() throws Exception { |
| |
| // Given: a bucket "1" with flow "1" with existing snapshots has been populated in the DB (see FlowsIT.sql) |
| |
| final String prePopulatedBucketId = "1"; |
| final String prePopulatedFlowId = "1"; |
| // For this test case, the order of the expected list matters as we are asserting a strict equality check |
| final String expected = "[" + |
| "{\"bucketIdentifier\":\"1\"," + |
| "\"flowIdentifier\":\"1\"," + |
| "\"version\":2," + |
| "\"timestamp\":1505174400000," + |
| "\"author\" : \"user2\"," + |
| "\"comments\":\"This is flow 1 snapshot 2\"," + |
| "\"link\":{\"params\":{\"rel\":\"content\"},\"href\":\"buckets/1/flows/1/versions/2\"}}," + |
| "{\"bucketIdentifier\":\"1\"," + |
| "\"flowIdentifier\":\"1\"," + |
| "\"version\":1," + |
| "\"timestamp\":1505088000000," + |
| "\"author\" : \"user1\"," + |
| "\"comments\":\"This is flow 1 snapshot 1\"," + |
| "\"link\":{\"params\":{\"rel\":\"content\"},\"href\":\"buckets/1/flows/1/versions/1\"}}" + |
| "]"; |
| |
| // When: the /buckets/{id}/flows/{id}/versions endpoint is queried |
| final String flowSnapshotsJson = client |
| .target(createURL("buckets/{bucketId}/flows/{flowId}/versions")) |
| .resolveTemplate("bucketId", prePopulatedBucketId) |
| .resolveTemplate("flowId", prePopulatedFlowId) |
| .request() |
| .get(String.class); |
| |
| // Then: the pre-populated list of flow versions is returned, in descending order |
| JSONAssert.assertEquals(expected, flowSnapshotsJson, true); |
| |
| } |
| |
| @Test |
| public void testCreateFlowVersionGetFlowVersion() throws Exception { |
| |
| // Given: an empty Bucket "3" (see FlowsIT.sql) with a newly created flow |
| |
| long testStartTime = System.currentTimeMillis(); |
| final String bucketId = "2"; |
| final VersionedFlow flow = new VersionedFlow(); |
| flow.setBucketIdentifier(bucketId); |
| flow.setName("Test Flow for creating snapshots"); |
| flow.setDescription("This is a randomly named flow created by an integration test for the purpose of holding snapshots."); |
| final VersionedFlow createdFlow = client |
| .target(createURL("buckets/{bucketId}/flows")) |
| .resolveTemplate("bucketId", bucketId) |
| .request() |
| .post(Entity.entity(flow, MediaType.APPLICATION_JSON), VersionedFlow.class); |
| final String flowId = createdFlow.getIdentifier(); |
| |
| // When: an initial flow snapshot is created *without* a version |
| |
| final VersionedFlowSnapshotMetadata flowSnapshotMetadata = new VersionedFlowSnapshotMetadata(); |
| flowSnapshotMetadata.setBucketIdentifier("2"); |
| flowSnapshotMetadata.setFlowIdentifier(flowId); |
| flowSnapshotMetadata.setComments("This is snapshot 1, created by an integration test."); |
| final VersionedFlowSnapshot flowSnapshot = new VersionedFlowSnapshot(); |
| flowSnapshot.setSnapshotMetadata(flowSnapshotMetadata); |
| flowSnapshot.setFlowContents(new VersionedProcessGroup()); // an empty root process group |
| |
| WebTarget clientRequestTarget = client |
| .target(createURL("buckets/{bucketId}/flows/{flowId}/versions")) |
| .resolveTemplate("bucketId", bucketId) |
| .resolveTemplate("flowId", flowId); |
| final Response response = |
| clientRequestTarget.request().post(Entity.entity(flowSnapshot, MediaType.APPLICATION_JSON), Response.class); |
| |
| // Then: an error is returned because version != 1 |
| |
| assertEquals(400, response.getStatus()); |
| |
| // But When: an initial flow snapshot is created with version == 1 |
| |
| flowSnapshot.getSnapshotMetadata().setVersion(1); |
| final VersionedFlowSnapshot createdFlowSnapshot = |
| clientRequestTarget.request().post(Entity.entity(flowSnapshot, MediaType.APPLICATION_JSON), VersionedFlowSnapshot.class); |
| |
| // Then: the server returns the created flow snapshot, with server-set fields populated correctly :) |
| |
| assertFlowSnapshotsEqual(flowSnapshot, createdFlowSnapshot, false); |
| assertTrue(createdFlowSnapshot.getSnapshotMetadata().getTimestamp() - testStartTime > 0L); // both server and client in same JVM, so there shouldn't be skew |
| assertEquals("anonymous", createdFlowSnapshot.getSnapshotMetadata().getAuthor()); |
| assertNotNull(createdFlowSnapshot.getSnapshotMetadata().getLink()); |
| assertNotNull(createdFlowSnapshot.getSnapshotMetadata().getLink().getUri()); |
| assertNotNull(createdFlowSnapshot.getFlow()); |
| assertEquals(1, createdFlowSnapshot.getFlow().getVersionCount()); |
| assertNotNull(createdFlowSnapshot.getBucket()); |
| |
| // And when .../flows/{id}/versions is queried, then the newly created flow snapshot is returned in the list |
| |
| final VersionedFlowSnapshotMetadata[] versionedFlowSnapshots = |
| clientRequestTarget.request().get(VersionedFlowSnapshotMetadata[].class); |
| assertNotNull(versionedFlowSnapshots); |
| assertEquals(1, versionedFlowSnapshots.length); |
| assertFlowSnapshotMetadataEqual(createdFlowSnapshot.getSnapshotMetadata(), versionedFlowSnapshots[0], true); |
| |
| // And when the link URI is queried, then the newly created flow snapshot is returned |
| |
| final VersionedFlowSnapshot flowSnapshotByLink = client |
| .target(createURL(versionedFlowSnapshots[0].getLink().getUri().toString())) |
| .request() |
| .get(VersionedFlowSnapshot.class); |
| assertFlowSnapshotsEqual(createdFlowSnapshot, flowSnapshotByLink, true); |
| assertNotNull(flowSnapshotByLink.getFlow()); |
| assertNotNull(flowSnapshotByLink.getBucket()); |
| |
| // And when the bucket is queried by .../versions/{v}, then the newly created flow snapshot is returned |
| |
| final VersionedFlowSnapshot flowSnapshotByVersionNumber = clientRequestTarget.path("/1").request().get(VersionedFlowSnapshot.class); |
| assertFlowSnapshotsEqual(createdFlowSnapshot, flowSnapshotByVersionNumber, true); |
| assertNotNull(flowSnapshotByVersionNumber.getFlow()); |
| assertNotNull(flowSnapshotByVersionNumber.getBucket()); |
| |
| // And when the latest URI is queried, then the newly created flow snapshot is returned |
| |
| final VersionedFlowSnapshot flowSnapshotByLatest = clientRequestTarget.path("/latest").request().get(VersionedFlowSnapshot.class); |
| assertFlowSnapshotsEqual(createdFlowSnapshot, flowSnapshotByLatest, true); |
| assertNotNull(flowSnapshotByLatest.getFlow()); |
| assertNotNull(flowSnapshotByLatest.getBucket()); |
| |
| } |
| |
| @Test |
| public void testFlowNameUniquePerBucket() throws Exception { |
| |
| final String flowName = "Flow 1"; |
| |
| // verify we have an existing flow with the name "Flow 1" in bucket 1 |
| final VersionedFlow existingFlow = client |
| .target(createURL("buckets/1/flows/1")) |
| .request() |
| .get(VersionedFlow.class); |
| |
| assertNotNull(existingFlow); |
| assertEquals(flowName, existingFlow.getName()); |
| |
| // create a new flow with the same name |
| |
| final String bucketId = "3"; |
| |
| final VersionedFlow flow = new VersionedFlow(); |
| flow.setBucketIdentifier(bucketId); |
| flow.setName(flowName); |
| flow.setDescription("This is a flow created by an integration test."); |
| |
| // saving this flow to bucket 3 should work because bucket 3 is empty |
| |
| final VersionedFlow createdFlow = client |
| .target(createURL("buckets/3/flows")) |
| .resolveTemplate("bucketId", bucketId) |
| .request() |
| .post(Entity.entity(flow, MediaType.APPLICATION_JSON), VersionedFlow.class); |
| |
| assertNotNull(createdFlow); |
| |
| // saving the flow to bucket 1 should not work because there is a flow with the same name |
| flow.setBucketIdentifier("1"); |
| try { |
| client.target(createURL("buckets/1/flows")) |
| .resolveTemplate("bucketId", bucketId) |
| .request() |
| .post(Entity.entity(flow, MediaType.APPLICATION_JSON), VersionedFlow.class); |
| |
| Assert.fail("Should have thrown exception"); |
| } catch (WebApplicationException e) { |
| final String errorMessage = e.getResponse().readEntity(String.class); |
| Assert.assertEquals("A versioned flow with the same name already exists in the selected bucket", errorMessage); |
| } |
| |
| } |
| |
| } |