NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel (#5402)

* NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel
* NIFI-9229 Addressing review comments modified log message and added comments
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index f933cf7..9c8d264 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -158,6 +158,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -4293,16 +4294,30 @@
         //As Input Port (IP1) originally belonged to PGA the new connection would be incorrectly linked to the old Input Port
         //instead of the one being in PGB, so it needs to be removed first before updating the connections.
 
-        for (final String removedVersionedId : inputPortsRemoved) {
+        Iterator<String> inputPortsRemovedIterator = inputPortsRemoved.iterator();
+        while (inputPortsRemovedIterator.hasNext()) {
+            final String removedVersionedId = inputPortsRemovedIterator.next();
             final Port port = inputPortsByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", port, group);
-            group.removeInputPort(port);
+            try {
+                group.removeInputPort(port);
+                inputPortsRemovedIterator.remove();
+            } catch (IllegalStateException e) {
+                LOG.info("Removing {} from {} not possible at the moment, will try again after updated the connections.", port, group);
+            }
         }
 
-        for (final String removedVersionedId : outputPortsRemoved) {
+        Iterator<String> outputPortsRemovedIterator = outputPortsRemoved.iterator();
+        while (outputPortsRemovedIterator.hasNext()) {
+            final String removedVersionedId = outputPortsRemovedIterator.next();
             final Port port = outputPortsByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", port, group);
-            group.removeOutputPort(port);
+            try {
+                group.removeOutputPort(port);
+                outputPortsRemovedIterator.remove();
+            } catch (IllegalStateException e) {
+                LOG.info("Removing {} from {} not possible at the moment, will try again after updated the connections.", port, group);
+            }
         }
 
         // Add and update Connections
@@ -4343,6 +4358,20 @@
             group.removeFunnel(funnel);
         }
 
+        //Removing remaining input ports
+        for (final String removedVersionedId : inputPortsRemoved) {
+            final Port port = inputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeInputPort(port);
+        }
+
+        //Removing remaining output ports
+        for (final String removedVersionedId : outputPortsRemoved) {
+            final Port port = outputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeOutputPort(port);
+        }
+
         // Now that all input/output ports have been removed, we should be able to update
         // all ports to the final name that was proposed in the new flow version.
         for (final Map.Entry<Port, String> portAndFinalName : proposedPortFinalNames.entrySet()) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
index c4a00b3..cdf7a65 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
@@ -20,12 +20,14 @@
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.StandardSnippet;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedFunnel;
 import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.integration.DirectInjectionExtensionManager;
@@ -443,6 +445,222 @@
         assertTrue(groupA.getInputPorts().isEmpty());
     }
 
+    @Test
+    public void testUpdateFlowWithOutputPortChangedToFunnelInAConnection() {
+        //Testing use case NIFI-9229
+        //Create Process Group
+        final ProcessGroup group = createProcessGroup("p-group-id", "P Group", getRootGroup());
+
+        //Create Processor under Process Group
+        final ProcessorNode processor = createProcessorNode(GenerateProcessor.class, group);
+
+        //Add Output Port to Process Group
+        final Port port = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
+        group.addOutputPort(port);
+
+        //Create Connection between Processor and Input Port
+        final Connection connection = connect(group, processor, port, processor.getRelationships());
+
+        //Create a snapshot
+        final VersionedFlowSnapshot version1 = createFlowSnapshot(group);
+
+        //Create Funnel under Process Group
+        Funnel funnel = getFlowController().getFlowManager().createFunnel("funnel-id");
+        group.addFunnel(funnel);
+
+        //Modify connection's destination from Output Port to Funnel
+        connection.setDestination(funnel);
+
+        //Delete Output Port
+        group.removeOutputPort(port);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version2 = createFlowSnapshot(group);
+
+        //Change Process Group version to Version 1
+        group.updateFlow(version1, null, false, true, true);
+
+        //Process Group should have only one Output Port, One Processor and One connection
+        assertEquals(1, group.getProcessors().size());
+        assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, group.getConnections().size());
+        assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, group.getOutputPorts().size());
+        assertEquals(port.getVersionedComponentId(), group.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+        assertTrue(group.getFunnels().isEmpty());
+        assertEquals(connection.getDestination().getVersionedComponentId(), port.getVersionedComponentId());
+
+        //Change Process Group version to Version 2
+        group.updateFlow(version2, null, false, true, true);
+
+        //Process Group should have a Funnel, a Processor, a Connection and no Output Ports
+        assertTrue(group.getOutputPorts().isEmpty());
+        assertEquals(1, group.getProcessors().size());
+        assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, group.getConnections().size());
+        assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, group.getFunnels().size());
+        assertEquals(funnel.getVersionedComponentId(), group.getFunnels().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(connection.getDestination().getVersionedComponentId(), funnel.getVersionedComponentId());
+    }
+
+    @Test
+    public void testUpdateFlowWithModifyingConnectionDeletingAndMovingPort() {
+        //Create Process Group A
+        final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup());
+
+        //Create Process Group B under Process Group A
+        final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
+
+        //Add Input port under Process Group B
+        final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
+        groupB.addInputPort(inputPort);
+
+        //Add Processor 1 under Process Group A
+        final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA);
+
+        //Add Processor 2 under Process Group A
+        final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA);
+
+        //Add Output Port under Process Group A
+        final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
+        groupA.addOutputPort(outputPort);
+
+        //Connect Processor 1 and Output Port as Connection 1
+        final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships());
+
+        //Connect Processor 1 and Input Port as Connection 2
+        final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
+
+        //Create a snapshot
+        final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+        //Modify Connection 1 to point to Processor 2
+        connection1.setDestination(processor2);
+
+        //Move Output Port to Process Group B
+        moveOutputPort(outputPort, groupB);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+        //Delete connection 2
+        groupA.removeConnection(connection2);
+
+        //Delete Input Port
+        groupB.removeInputPort(inputPort);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
+
+        //Change Process Group version to Version 1
+        groupA.updateFlow(version1, null, false, true, true);
+
+        //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
+        assertEquals(2, groupA.getProcessors().size());
+        assertEquals(2, groupA.getConnections().size());
+        assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
+        assertEquals(1, groupA.getOutputPorts().size());
+        assertEquals(1, groupA.getProcessGroups().size());
+        assertEquals(1, groupB.getInputPorts().size());
+
+        //Change Process Group version to Version 2
+        groupA.updateFlow(version2, null, false, true, true);
+
+        //Connection1 destination changed to Processor2 and Output Port moved to Process Group B
+        assertTrue(groupA.getOutputPorts().isEmpty());
+        assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
+        assertEquals(1, groupB.getOutputPorts().size());
+        assertEquals(outputPort.getVersionedComponentId(), groupB.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+
+        //Change Process Group version to Version 3
+        groupA.updateFlow(version3, null, false, true, true);
+
+        //Connection2 and Input Port should be deleted
+        assertEquals(1, groupA.getConnections().size());
+        assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertTrue(groupB.getInputPorts().isEmpty());
+    }
+
+    @Test
+    public void testUpdateFlowWithDeletingConnectionDeletingAndMovingPort() {
+        //Create Process Group A
+        final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup());
+
+        //Create Process Group B under Process Group A
+        final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
+
+        //Add Input port under Process Group B
+        final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
+        groupB.addInputPort(inputPort);
+
+        //Add Processor 1 under Process Group A
+        final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA);
+
+        //Add Processor 2 under Process Group A
+        final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA);
+
+        //Add Output Port under Process Group A
+        final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
+        groupA.addOutputPort(outputPort);
+
+        //Connect Processor 1 and Output Port as Connection 1
+        final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships());
+
+        //Connect Processor 1 and Input Port as Connection 2
+        final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
+
+        //Create a snapshot
+        final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+        //Modify Connection 1 to point to Processor 2
+        connection1.setDestination(processor2);
+
+        //Delete Output Port
+        groupA.removeOutputPort(outputPort);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+        //Delete connection 2
+        groupA.removeConnection(connection2);
+
+        //Move Input Port to Process Group A
+        moveInputPort(inputPort, groupA);
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
+
+        //Change Process Group version to Version 1
+        groupA.updateFlow(version1, null, false, true, true);
+
+        //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
+        assertEquals(2, groupA.getProcessors().size());
+        assertEquals(2, groupA.getConnections().size());
+        assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
+        assertEquals(1, groupA.getOutputPorts().size());
+        assertEquals(1, groupA.getProcessGroups().size());
+        assertEquals(1, groupB.getInputPorts().size());
+
+        //Change Process Group version to Version 2
+        groupA.updateFlow(version2, null, false, true, true);
+
+        //Connection1 destination changed to Processor2 and Output Port deleted
+        assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
+        assertTrue(groupA.getOutputPorts().isEmpty());
+        assertTrue(groupB.getOutputPorts().isEmpty());
+
+        //Change Process Group version to Version 3
+        groupA.updateFlow(version3, null, false, true, true);
+
+        //Connection2 should be deleted and Input Port moved to Process Group A
+        assertEquals(1, groupA.getConnections().size());
+        assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertTrue(groupB.getInputPorts().isEmpty());
+        assertEquals(1, groupA.getInputPorts().size());
+        assertEquals(inputPort.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+    }
+
     private ProcessGroup createProcessGroup(final String groupId, final String groupName, final ProcessGroup destination) {
         final ProcessGroup group = getFlowController().getFlowManager().createProcessGroup(groupId);
         group.setName(groupName);
@@ -458,6 +676,14 @@
         port.getProcessGroup().move(snippet, destination);
     }
 
+    private void moveOutputPort(final Port port, final ProcessGroup destination) {
+        final StandardSnippet snippet = new StandardSnippet();
+        snippet.setParentGroupId(port.getProcessGroupIdentifier());
+        snippet.addOutputPorts(Collections.singletonMap(port.getIdentifier(), null));
+
+        port.getProcessGroup().move(snippet, destination);
+    }
+
 
     private Set<FlowDifference> getLocalModifications(final ProcessGroup processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) {
         final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(getFlowController().getExtensionManager());
@@ -495,6 +721,8 @@
         final List<ProcessorNode> processorNodes;
         final List<ControllerServiceNode> controllerServiceNodes;
         final List<Port> inputPorts;
+        final List<Port> outputPorts;
+        final List<Funnel> funnels;
         final List<Connection> connections;
         final List<ProcessGroup> processGroups;
         final Set<VersionedProcessGroup> versionedProcessGroups;
@@ -503,12 +731,16 @@
             processorNodes = processors;
             controllerServiceNodes = controllerServices;
             inputPorts = Collections.EMPTY_LIST;
+            outputPorts = Collections.EMPTY_LIST;
+            funnels = Collections.EMPTY_LIST;
             connections = Collections.EMPTY_LIST;
             versionedProcessGroups = Collections.EMPTY_SET;
         } else {
             processorNodes = new ArrayList<>(group.getProcessors());
             controllerServiceNodes = new ArrayList<>(group.getControllerServices(false));
             inputPorts = new ArrayList<>(group.getInputPorts());
+            outputPorts = new ArrayList<>(group.getOutputPorts());
+            funnels = new ArrayList<>(group.getFunnels());
             connections = new ArrayList<>(group.getConnections());
             processGroups = new ArrayList<>(group.getProcessGroups());
 
@@ -541,6 +773,20 @@
             inputPort.setVersionedComponentId(versionedInputPort.getIdentifier());
         }
 
+        final Set<VersionedPort> versionedOutputPorts = new HashSet<>();
+        for (final Port outputPort : outputPorts) {
+            final VersionedPort versionedOutputPort = flowMapper.mapPort(outputPort);
+            versionedOutputPorts.add(versionedOutputPort);
+            outputPort.setVersionedComponentId(versionedOutputPort.getIdentifier());
+        }
+
+        final Set<VersionedFunnel> versionedFunnels = new HashSet<>();
+        for (final Funnel funnel : funnels) {
+            final VersionedFunnel versionedFunnel = flowMapper.mapFunnel(funnel);
+            versionedFunnels.add(versionedFunnel);
+            funnel.setVersionedComponentId(versionedFunnel.getIdentifier());
+        }
+
         final Set<VersionedConnection> versionedConnections = new HashSet<>();
         for (final Connection connection : connections) {
             final VersionedConnection versionedConnection = flowMapper.mapConnection(connection);
@@ -553,6 +799,8 @@
         flowContents.setControllerServices(versionedServices);
         flowContents.setProcessGroups(versionedProcessGroups);
         flowContents.setInputPorts(versionedInputPorts);
+        flowContents.setOutputPorts(versionedOutputPorts);
+        flowContents.setFunnels(versionedFunnels);
         flowContents.setConnections(versionedConnections);
 
         final VersionedFlowSnapshot versionedFlowSnapshot = createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents);