NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel (#5402)
* NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel
* NIFI-9229 Addressing review comments modified log message and added comments
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index f933cf7..9c8d264 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -158,6 +158,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -4293,16 +4294,30 @@
//As Input Port (IP1) originally belonged to PGA the new connection would be incorrectly linked to the old Input Port
//instead of the one being in PGB, so it needs to be removed first before updating the connections.
- for (final String removedVersionedId : inputPortsRemoved) {
+ Iterator<String> inputPortsRemovedIterator = inputPortsRemoved.iterator();
+ while (inputPortsRemovedIterator.hasNext()) {
+ final String removedVersionedId = inputPortsRemovedIterator.next();
final Port port = inputPortsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", port, group);
- group.removeInputPort(port);
+ try {
+ group.removeInputPort(port);
+ inputPortsRemovedIterator.remove();
+ } catch (IllegalStateException e) {
+ LOG.info("Removing {} from {} not possible at the moment, will try again after updated the connections.", port, group);
+ }
}
- for (final String removedVersionedId : outputPortsRemoved) {
+ Iterator<String> outputPortsRemovedIterator = outputPortsRemoved.iterator();
+ while (outputPortsRemovedIterator.hasNext()) {
+ final String removedVersionedId = outputPortsRemovedIterator.next();
final Port port = outputPortsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", port, group);
- group.removeOutputPort(port);
+ try {
+ group.removeOutputPort(port);
+ outputPortsRemovedIterator.remove();
+ } catch (IllegalStateException e) {
+ LOG.info("Removing {} from {} not possible at the moment, will try again after updated the connections.", port, group);
+ }
}
// Add and update Connections
@@ -4343,6 +4358,20 @@
group.removeFunnel(funnel);
}
+ //Removing remaining input ports
+ for (final String removedVersionedId : inputPortsRemoved) {
+ final Port port = inputPortsByVersionedId.get(removedVersionedId);
+ LOG.info("Removing {} from {}", port, group);
+ group.removeInputPort(port);
+ }
+
+ //Removing remaining output ports
+ for (final String removedVersionedId : outputPortsRemoved) {
+ final Port port = outputPortsByVersionedId.get(removedVersionedId);
+ LOG.info("Removing {} from {}", port, group);
+ group.removeOutputPort(port);
+ }
+
// Now that all input/output ports have been removed, we should be able to update
// all ports to the final name that was proposed in the new flow version.
for (final Map.Entry<Port, String> portAndFinalName : proposedPortFinalNames.entrySet()) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
index c4a00b3..cdf7a65 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
@@ -20,12 +20,14 @@
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.DirectInjectionExtensionManager;
@@ -443,6 +445,222 @@
assertTrue(groupA.getInputPorts().isEmpty());
}
+ @Test
+ public void testUpdateFlowWithOutputPortChangedToFunnelInAConnection() {
+ //Testing use case NIFI-9229
+ //Create Process Group
+ final ProcessGroup group = createProcessGroup("p-group-id", "P Group", getRootGroup());
+
+ //Create Processor under Process Group
+ final ProcessorNode processor = createProcessorNode(GenerateProcessor.class, group);
+
+ //Add Output Port to Process Group
+ final Port port = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
+ group.addOutputPort(port);
+
+ //Create Connection between Processor and Input Port
+ final Connection connection = connect(group, processor, port, processor.getRelationships());
+
+ //Create a snapshot
+ final VersionedFlowSnapshot version1 = createFlowSnapshot(group);
+
+ //Create Funnel under Process Group
+ Funnel funnel = getFlowController().getFlowManager().createFunnel("funnel-id");
+ group.addFunnel(funnel);
+
+ //Modify connection's destination from Output Port to Funnel
+ connection.setDestination(funnel);
+
+ //Delete Output Port
+ group.removeOutputPort(port);
+
+ //Create another snapshot
+ final VersionedFlowSnapshot version2 = createFlowSnapshot(group);
+
+ //Change Process Group version to Version 1
+ group.updateFlow(version1, null, false, true, true);
+
+ //Process Group should have only one Output Port, One Processor and One connection
+ assertEquals(1, group.getProcessors().size());
+ assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(1, group.getConnections().size());
+ assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(1, group.getOutputPorts().size());
+ assertEquals(port.getVersionedComponentId(), group.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+ assertTrue(group.getFunnels().isEmpty());
+ assertEquals(connection.getDestination().getVersionedComponentId(), port.getVersionedComponentId());
+
+ //Change Process Group version to Version 2
+ group.updateFlow(version2, null, false, true, true);
+
+ //Process Group should have a Funnel, a Processor, a Connection and no Output Ports
+ assertTrue(group.getOutputPorts().isEmpty());
+ assertEquals(1, group.getProcessors().size());
+ assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(1, group.getConnections().size());
+ assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(1, group.getFunnels().size());
+ assertEquals(funnel.getVersionedComponentId(), group.getFunnels().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(connection.getDestination().getVersionedComponentId(), funnel.getVersionedComponentId());
+ }
+
+ @Test
+ public void testUpdateFlowWithModifyingConnectionDeletingAndMovingPort() {
+ //Create Process Group A
+ final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup());
+
+ //Create Process Group B under Process Group A
+ final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
+
+ //Add Input port under Process Group B
+ final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
+ groupB.addInputPort(inputPort);
+
+ //Add Processor 1 under Process Group A
+ final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA);
+
+ //Add Processor 2 under Process Group A
+ final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA);
+
+ //Add Output Port under Process Group A
+ final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
+ groupA.addOutputPort(outputPort);
+
+ //Connect Processor 1 and Output Port as Connection 1
+ final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships());
+
+ //Connect Processor 1 and Input Port as Connection 2
+ final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
+
+ //Create a snapshot
+ final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+ //Modify Connection 1 to point to Processor 2
+ connection1.setDestination(processor2);
+
+ //Move Output Port to Process Group B
+ moveOutputPort(outputPort, groupB);
+
+ //Create another snapshot
+ final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+ //Delete connection 2
+ groupA.removeConnection(connection2);
+
+ //Delete Input Port
+ groupB.removeInputPort(inputPort);
+
+ //Create another snapshot
+ final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
+
+ //Change Process Group version to Version 1
+ groupA.updateFlow(version1, null, false, true, true);
+
+ //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
+ assertEquals(2, groupA.getProcessors().size());
+ assertEquals(2, groupA.getConnections().size());
+ assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
+ assertEquals(1, groupA.getOutputPorts().size());
+ assertEquals(1, groupA.getProcessGroups().size());
+ assertEquals(1, groupB.getInputPorts().size());
+
+ //Change Process Group version to Version 2
+ groupA.updateFlow(version2, null, false, true, true);
+
+ //Connection1 destination changed to Processor2 and Output Port moved to Process Group B
+ assertTrue(groupA.getOutputPorts().isEmpty());
+ assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
+ assertEquals(1, groupB.getOutputPorts().size());
+ assertEquals(outputPort.getVersionedComponentId(), groupB.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+
+ //Change Process Group version to Version 3
+ groupA.updateFlow(version3, null, false, true, true);
+
+ //Connection2 and Input Port should be deleted
+ assertEquals(1, groupA.getConnections().size());
+ assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+ assertTrue(groupB.getInputPorts().isEmpty());
+ }
+
+ @Test
+ public void testUpdateFlowWithDeletingConnectionDeletingAndMovingPort() {
+ //Create Process Group A
+ final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup());
+
+ //Create Process Group B under Process Group A
+ final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
+
+ //Add Input port under Process Group B
+ final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
+ groupB.addInputPort(inputPort);
+
+ //Add Processor 1 under Process Group A
+ final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA);
+
+ //Add Processor 2 under Process Group A
+ final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA);
+
+ //Add Output Port under Process Group A
+ final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
+ groupA.addOutputPort(outputPort);
+
+ //Connect Processor 1 and Output Port as Connection 1
+ final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships());
+
+ //Connect Processor 1 and Input Port as Connection 2
+ final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
+
+ //Create a snapshot
+ final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+ //Modify Connection 1 to point to Processor 2
+ connection1.setDestination(processor2);
+
+ //Delete Output Port
+ groupA.removeOutputPort(outputPort);
+
+ //Create another snapshot
+ final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+ //Delete connection 2
+ groupA.removeConnection(connection2);
+
+ //Move Input Port to Process Group A
+ moveInputPort(inputPort, groupA);
+
+ //Create another snapshot
+ final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
+
+ //Change Process Group version to Version 1
+ groupA.updateFlow(version1, null, false, true, true);
+
+ //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
+ assertEquals(2, groupA.getProcessors().size());
+ assertEquals(2, groupA.getConnections().size());
+ assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
+ assertEquals(1, groupA.getOutputPorts().size());
+ assertEquals(1, groupA.getProcessGroups().size());
+ assertEquals(1, groupB.getInputPorts().size());
+
+ //Change Process Group version to Version 2
+ groupA.updateFlow(version2, null, false, true, true);
+
+ //Connection1 destination changed to Processor2 and Output Port deleted
+ assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
+ assertTrue(groupA.getOutputPorts().isEmpty());
+ assertTrue(groupB.getOutputPorts().isEmpty());
+
+ //Change Process Group version to Version 3
+ groupA.updateFlow(version3, null, false, true, true);
+
+ //Connection2 should be deleted and Input Port moved to Process Group A
+ assertEquals(1, groupA.getConnections().size());
+ assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+ assertTrue(groupB.getInputPorts().isEmpty());
+ assertEquals(1, groupA.getInputPorts().size());
+ assertEquals(inputPort.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+ }
+
private ProcessGroup createProcessGroup(final String groupId, final String groupName, final ProcessGroup destination) {
final ProcessGroup group = getFlowController().getFlowManager().createProcessGroup(groupId);
group.setName(groupName);
@@ -458,6 +676,14 @@
port.getProcessGroup().move(snippet, destination);
}
+ private void moveOutputPort(final Port port, final ProcessGroup destination) {
+ final StandardSnippet snippet = new StandardSnippet();
+ snippet.setParentGroupId(port.getProcessGroupIdentifier());
+ snippet.addOutputPorts(Collections.singletonMap(port.getIdentifier(), null));
+
+ port.getProcessGroup().move(snippet, destination);
+ }
+
private Set<FlowDifference> getLocalModifications(final ProcessGroup processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(getFlowController().getExtensionManager());
@@ -495,6 +721,8 @@
final List<ProcessorNode> processorNodes;
final List<ControllerServiceNode> controllerServiceNodes;
final List<Port> inputPorts;
+ final List<Port> outputPorts;
+ final List<Funnel> funnels;
final List<Connection> connections;
final List<ProcessGroup> processGroups;
final Set<VersionedProcessGroup> versionedProcessGroups;
@@ -503,12 +731,16 @@
processorNodes = processors;
controllerServiceNodes = controllerServices;
inputPorts = Collections.EMPTY_LIST;
+ outputPorts = Collections.EMPTY_LIST;
+ funnels = Collections.EMPTY_LIST;
connections = Collections.EMPTY_LIST;
versionedProcessGroups = Collections.EMPTY_SET;
} else {
processorNodes = new ArrayList<>(group.getProcessors());
controllerServiceNodes = new ArrayList<>(group.getControllerServices(false));
inputPorts = new ArrayList<>(group.getInputPorts());
+ outputPorts = new ArrayList<>(group.getOutputPorts());
+ funnels = new ArrayList<>(group.getFunnels());
connections = new ArrayList<>(group.getConnections());
processGroups = new ArrayList<>(group.getProcessGroups());
@@ -541,6 +773,20 @@
inputPort.setVersionedComponentId(versionedInputPort.getIdentifier());
}
+ final Set<VersionedPort> versionedOutputPorts = new HashSet<>();
+ for (final Port outputPort : outputPorts) {
+ final VersionedPort versionedOutputPort = flowMapper.mapPort(outputPort);
+ versionedOutputPorts.add(versionedOutputPort);
+ outputPort.setVersionedComponentId(versionedOutputPort.getIdentifier());
+ }
+
+ final Set<VersionedFunnel> versionedFunnels = new HashSet<>();
+ for (final Funnel funnel : funnels) {
+ final VersionedFunnel versionedFunnel = flowMapper.mapFunnel(funnel);
+ versionedFunnels.add(versionedFunnel);
+ funnel.setVersionedComponentId(versionedFunnel.getIdentifier());
+ }
+
final Set<VersionedConnection> versionedConnections = new HashSet<>();
for (final Connection connection : connections) {
final VersionedConnection versionedConnection = flowMapper.mapConnection(connection);
@@ -553,6 +799,8 @@
flowContents.setControllerServices(versionedServices);
flowContents.setProcessGroups(versionedProcessGroups);
flowContents.setInputPorts(versionedInputPorts);
+ flowContents.setOutputPorts(versionedOutputPorts);
+ flowContents.setFunnels(versionedFunnels);
flowContents.setConnections(versionedConnections);
final VersionedFlowSnapshot versionedFlowSnapshot = createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents);