NIFI-15011 - FlowDifferenceFilters - handle splitRelationship() (#10341)

diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index caf6ca1..436ef9a 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -18,8 +18,12 @@
 
 import org.apache.nifi.annotation.behavior.DynamicProperties;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.flow.FlowManager;
@@ -36,10 +40,13 @@
 import org.apache.nifi.flow.VersionedProcessGroup;
 import org.apache.nifi.flow.VersionedProcessor;
 import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.registry.flow.diff.DifferenceType;
 import org.apache.nifi.registry.flow.diff.FlowDifference;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
 
@@ -96,7 +103,8 @@
             || isStaticPropertyRemoved(difference, flowManager)
             || isControllerServiceCreatedForNewProperty(difference, evaluatedContext)
             || isPropertyParameterizationRename(difference, evaluatedContext)
-            || isPropertyRenameWithMatchingValue(difference, evaluatedContext);
+            || isPropertyRenameWithMatchingValue(difference, evaluatedContext)
+            || isSelectedRelationshipChangeForNewRelationship(difference, flowManager);
     }
 
     private static boolean isSensitivePropertyDueToGhosting(final FlowDifference difference, final FlowManager flowManager) {
@@ -441,6 +449,97 @@
         return true;
     }
 
+    private static boolean isSelectedRelationshipChangeForNewRelationship(final FlowDifference difference, final FlowManager flowManager) {
+        if (difference.getDifferenceType() != DifferenceType.SELECTED_RELATIONSHIPS_CHANGED) {
+            return false;
+        }
+
+        if (!(difference.getComponentA() instanceof VersionedConnection connectionA)) {
+            return false;
+        }
+
+        if (!(difference.getComponentB() instanceof InstantiatedVersionedConnection connectionB)) {
+            return false;
+        }
+
+        final Set<String> selectedA = new HashSet<>(replaceNull(connectionA.getSelectedRelationships(), Collections.emptySet()));
+        final Set<String> selectedB = new HashSet<>(replaceNull(connectionB.getSelectedRelationships(), Collections.emptySet()));
+
+        final Set<String> newlySelected = new HashSet<>(selectedB);
+        newlySelected.removeAll(selectedA);
+        if (newlySelected.isEmpty()) {
+            return false;
+        }
+
+        final Set<String> removedRelationships = new HashSet<>(selectedA);
+        removedRelationships.removeAll(selectedB);
+
+        if (flowManager == null) {
+            return false;
+        }
+
+        final String connectionInstanceId = connectionB.getInstanceIdentifier();
+        final String connectionGroupId = connectionB.getInstanceGroupId();
+        if (connectionInstanceId == null || connectionGroupId == null) {
+            return false;
+        }
+
+        final ProcessGroup processGroup = flowManager.getGroup(connectionGroupId);
+        if (processGroup == null) {
+            return false;
+        }
+
+        final Connection connection = processGroup.getConnection(connectionInstanceId);
+        if (connection == null) {
+            return false;
+        }
+
+        final Connectable source = connection.getSource();
+        if (source == null || source.getConnectableType() != ConnectableType.PROCESSOR) {
+            return false;
+        }
+
+        final ProcessorNode processorNode = flowManager.getProcessorNode(source.getIdentifier());
+        if (processorNode == null) {
+            return false;
+        }
+
+        final Processor processor = processorNode.getProcessor();
+        if (processor != null) {
+            final Class<?> processorClass = processor.getClass();
+            if (processorClass.isAnnotationPresent(DynamicRelationship.class)) {
+                return false;
+            }
+        }
+
+        for (final String relationshipName : newlySelected) {
+            final Relationship relationship = processorNode.getRelationship(relationshipName);
+            if (relationship == null) {
+                return false;
+            }
+
+            if (processorNode.isAutoTerminated(relationship)) {
+                return false;
+            }
+
+            final Set<Connection> relationshipConnections = replaceNull(processorNode.getConnections(relationship), Collections.emptySet());
+            for (final Connection relationshipConnection : relationshipConnections) {
+                if (!relationshipConnection.getIdentifier().equals(connection.getIdentifier())) {
+                    return false;
+                }
+            }
+        }
+
+        for (final String removedRelationshipName : removedRelationships) {
+            final Relationship removedRelationship = processorNode.getRelationship(removedRelationshipName);
+            if (removedRelationship != null) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     private static <T> T replaceNull(final T value, final T replacement) {
         return value == null ? replacement : value;
     }
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
index cb2430f..c72b412 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
@@ -17,31 +17,40 @@
 package org.apache.nifi.util;
 
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flow.ComponentType;
 import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedConnection;
 import org.apache.nifi.flow.VersionedControllerService;
 import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.flow.VersionedProcessor;
 import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.registry.flow.diff.DifferenceType;
 import org.apache.nifi.registry.flow.diff.FlowDifference;
 import org.apache.nifi.registry.flow.diff.StandardFlowDifference;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -357,6 +366,52 @@
     }
 
     @Test
+    public void testSelectedRelationshipChangeForNewRelationshipObservedAsEnvironmentalChange() {
+        final FlowManager flowManager = Mockito.mock(FlowManager.class);
+        final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+        final Connection connection = Mockito.mock(Connection.class);
+        final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
+
+        final String connectionInstanceId = "connection-instance";
+        final String connectionGroupId = "group-id";
+        final String processorInstanceId = "processor-instance";
+        final String existingRelationshipName = "retry";
+        final String newRelationshipName = "restrictions changed";
+
+        final VersionedConnection connectionA = new VersionedConnection();
+        connectionA.setSelectedRelationships(Set.of(existingRelationshipName));
+
+        final InstantiatedVersionedConnection connectionB = new InstantiatedVersionedConnection(connectionInstanceId, connectionGroupId);
+        connectionB.setSelectedRelationships(Set.of(existingRelationshipName, newRelationshipName));
+
+        final FlowDifference difference = new StandardFlowDifference(
+                DifferenceType.SELECTED_RELATIONSHIPS_CHANGED,
+                connectionA,
+                connectionB,
+                null,
+                Set.of(existingRelationshipName),
+                Set.of(existingRelationshipName, newRelationshipName),
+                "Selected relationships updated");
+
+        final Relationship newRelationship = new Relationship.Builder().name(newRelationshipName).build();
+
+        Mockito.when(flowManager.getGroup(connectionGroupId)).thenReturn(processGroup);
+        Mockito.when(processGroup.getConnection(connectionInstanceId)).thenReturn(connection);
+        Mockito.when(connection.getIdentifier()).thenReturn(connectionInstanceId);
+        Mockito.when(connection.getSource()).thenReturn(processorNode);
+
+        Mockito.when(processorNode.getIdentifier()).thenReturn(processorInstanceId);
+        Mockito.when(processorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+        Mockito.when(flowManager.getProcessorNode(processorInstanceId)).thenReturn(processorNode);
+        Mockito.when(processorNode.getRelationship(newRelationshipName)).thenReturn(newRelationship);
+        Mockito.when(processorNode.isAutoTerminated(newRelationship)).thenReturn(false);
+        Mockito.when(processorNode.getProcessor()).thenReturn(new NonDynamicProcessor());
+        Mockito.when(processorNode.getConnections(newRelationship)).thenReturn(new HashSet<>(Set.of(connection)));
+
+        assertTrue(FlowDifferenceFilters.isEnvironmentalChange(difference, null, flowManager));
+    }
+
+    @Test
     public void testPropertyRenameWithMatchingValueObservedAsEnvironmentalChange() {
         final FlowManager flowManager = Mockito.mock(FlowManager.class);
         final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
@@ -407,6 +462,154 @@
         assertTrue(FlowDifferenceFilters.isEnvironmentalChange(propertyAdded, null, flowManager, context));
     }
 
+    @Test
+    public void testSelectedRelationshipChangeNotEnvironmentalWhenRelationshipUsedElsewhere() {
+        final FlowManager flowManager = Mockito.mock(FlowManager.class);
+        final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+        final Connection connection = Mockito.mock(Connection.class);
+        final Connection otherConnection = Mockito.mock(Connection.class);
+        final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
+
+        final String connectionInstanceId = "connection-instance";
+        final String otherConnectionId = "connection-other";
+        final String connectionGroupId = "group-id";
+        final String processorInstanceId = "processor-instance";
+        final String existingRelationshipName = "retry";
+        final String newRelationshipName = "restrictions changed";
+
+        final VersionedConnection connectionA = new VersionedConnection();
+        connectionA.setSelectedRelationships(Set.of(existingRelationshipName));
+
+        final InstantiatedVersionedConnection connectionB = new InstantiatedVersionedConnection(connectionInstanceId, connectionGroupId);
+        connectionB.setSelectedRelationships(Set.of(existingRelationshipName, newRelationshipName));
+
+        final FlowDifference difference = new StandardFlowDifference(
+                DifferenceType.SELECTED_RELATIONSHIPS_CHANGED,
+                connectionA,
+                connectionB,
+                null,
+                Set.of(existingRelationshipName),
+                Set.of(existingRelationshipName, newRelationshipName),
+                "Selected relationships updated");
+
+        final Relationship newRelationship = new Relationship.Builder().name(newRelationshipName).build();
+
+        Mockito.when(flowManager.getGroup(connectionGroupId)).thenReturn(processGroup);
+        Mockito.when(processGroup.getConnection(connectionInstanceId)).thenReturn(connection);
+        Mockito.when(connection.getIdentifier()).thenReturn(connectionInstanceId);
+        Mockito.when(connection.getSource()).thenReturn(processorNode);
+
+        Mockito.when(otherConnection.getIdentifier()).thenReturn(otherConnectionId);
+
+        Mockito.when(processorNode.getIdentifier()).thenReturn(processorInstanceId);
+        Mockito.when(processorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+        Mockito.when(flowManager.getProcessorNode(processorInstanceId)).thenReturn(processorNode);
+        Mockito.when(processorNode.getRelationship(newRelationshipName)).thenReturn(newRelationship);
+        Mockito.when(processorNode.isAutoTerminated(newRelationship)).thenReturn(false);
+        Mockito.when(processorNode.getProcessor()).thenReturn(new NonDynamicProcessor());
+        Mockito.when(processorNode.getConnections(newRelationship)).thenReturn(new HashSet<>(Set.of(connection, otherConnection)));
+
+        assertFalse(FlowDifferenceFilters.isEnvironmentalChange(difference, null, flowManager));
+    }
+
+    @Test
+    public void testSelectedRelationshipChangeNotEnvironmentalWhenProcessorHasDynamicRelationships() {
+        final FlowManager flowManager = Mockito.mock(FlowManager.class);
+        final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+        final Connection connection = Mockito.mock(Connection.class);
+        final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
+
+        final String connectionInstanceId = "connection-instance";
+        final String connectionGroupId = "group-id";
+        final String processorInstanceId = "processor-instance";
+        final String existingRelationshipName = "retry";
+        final String newRelationshipName = "dynamic";
+
+        final VersionedConnection connectionA = new VersionedConnection();
+        connectionA.setSelectedRelationships(Set.of(existingRelationshipName));
+
+        final InstantiatedVersionedConnection connectionB = new InstantiatedVersionedConnection(connectionInstanceId, connectionGroupId);
+        connectionB.setSelectedRelationships(Set.of(existingRelationshipName, newRelationshipName));
+
+        final FlowDifference difference = new StandardFlowDifference(
+                DifferenceType.SELECTED_RELATIONSHIPS_CHANGED,
+                connectionA,
+                connectionB,
+                null,
+                Set.of(existingRelationshipName),
+                Set.of(existingRelationshipName, newRelationshipName),
+                "Selected relationships updated");
+
+        final Relationship newRelationship = new Relationship.Builder().name(newRelationshipName).build();
+
+        Mockito.when(flowManager.getGroup(connectionGroupId)).thenReturn(processGroup);
+        Mockito.when(processGroup.getConnection(connectionInstanceId)).thenReturn(connection);
+        Mockito.when(connection.getIdentifier()).thenReturn(connectionInstanceId);
+        Mockito.when(connection.getSource()).thenReturn(processorNode);
+
+        Mockito.when(processorNode.getIdentifier()).thenReturn(processorInstanceId);
+        Mockito.when(processorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+        Mockito.when(flowManager.getProcessorNode(processorInstanceId)).thenReturn(processorNode);
+        Mockito.when(processorNode.getRelationship(newRelationshipName)).thenReturn(newRelationship);
+        Mockito.when(processorNode.isAutoTerminated(newRelationship)).thenReturn(false);
+        Mockito.when(processorNode.getConnections(newRelationship)).thenReturn(new HashSet<>(Set.of(connection)));
+        Mockito.when(processorNode.getProcessor()).thenReturn(new DynamicRelationshipProcessor());
+
+        assertFalse(FlowDifferenceFilters.isEnvironmentalChange(difference, null, flowManager));
+    }
+
+    @Test
+    public void testSelectedRelationshipChangeWithRemovedRelationshipObservedAsEnvironmentalChange() {
+        final FlowManager flowManager = Mockito.mock(FlowManager.class);
+        final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+        final Connection connection = Mockito.mock(Connection.class);
+        final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
+
+        final String connectionInstanceId = "connection-instance";
+        final String connectionGroupId = "group-id";
+        final String processorInstanceId = "processor-instance";
+        final String oldRelationshipName = "old";
+        final String firstNewRelationshipName = "newA";
+        final String secondNewRelationshipName = "newB";
+
+        final VersionedConnection connectionA = new VersionedConnection();
+        connectionA.setSelectedRelationships(Set.of(oldRelationshipName));
+
+        final InstantiatedVersionedConnection connectionB = new InstantiatedVersionedConnection(connectionInstanceId, connectionGroupId);
+        connectionB.setSelectedRelationships(Set.of(firstNewRelationshipName, secondNewRelationshipName));
+
+        final FlowDifference difference = new StandardFlowDifference(
+                DifferenceType.SELECTED_RELATIONSHIPS_CHANGED,
+                connectionA,
+                connectionB,
+                null,
+                Set.of(oldRelationshipName),
+                Set.of(firstNewRelationshipName, secondNewRelationshipName),
+                "Selected relationships updated");
+
+        final Relationship firstNewRelationship = new Relationship.Builder().name(firstNewRelationshipName).build();
+        final Relationship secondNewRelationship = new Relationship.Builder().name(secondNewRelationshipName).build();
+
+        Mockito.when(flowManager.getGroup(connectionGroupId)).thenReturn(processGroup);
+        Mockito.when(processGroup.getConnection(connectionInstanceId)).thenReturn(connection);
+        Mockito.when(connection.getIdentifier()).thenReturn(connectionInstanceId);
+        Mockito.when(connection.getSource()).thenReturn(processorNode);
+
+        Mockito.when(processorNode.getIdentifier()).thenReturn(processorInstanceId);
+        Mockito.when(processorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+        Mockito.when(flowManager.getProcessorNode(processorInstanceId)).thenReturn(processorNode);
+        Mockito.when(processorNode.getProcessor()).thenReturn(new NonDynamicProcessor());
+        Mockito.when(processorNode.getRelationship(firstNewRelationshipName)).thenReturn(firstNewRelationship);
+        Mockito.when(processorNode.getRelationship(secondNewRelationshipName)).thenReturn(secondNewRelationship);
+        Mockito.when(processorNode.getRelationship(oldRelationshipName)).thenReturn(null);
+        Mockito.when(processorNode.isAutoTerminated(firstNewRelationship)).thenReturn(false);
+        Mockito.when(processorNode.isAutoTerminated(secondNewRelationship)).thenReturn(false);
+        Mockito.when(processorNode.getConnections(firstNewRelationship)).thenReturn(new HashSet<>(Set.of(connection)));
+        Mockito.when(processorNode.getConnections(secondNewRelationship)).thenReturn(new HashSet<>(Set.of(connection)));
+
+        assertTrue(FlowDifferenceFilters.isEnvironmentalChange(difference, null, flowManager));
+    }
+
     @DynamicProperty(name = "Dynamic Property", value = "Value", description = "Allows dynamic properties")
     private static class DynamicAnnotationProcessor extends AbstractProcessor {
         @Override
@@ -414,4 +617,19 @@
             // No-op for testing
         }
     }
+
+    private static class NonDynamicProcessor extends AbstractProcessor {
+        @Override
+        public void onTrigger(final ProcessContext context, final ProcessSession session) {
+            // No-op for testing
+        }
+    }
+
+    @DynamicRelationship(name = "dynamic", description = "dynamic")
+    private static class DynamicRelationshipProcessor extends AbstractProcessor {
+        @Override
+        public void onTrigger(final ProcessContext context, final ProcessSession session) {
+            // No-op for testing
+        }
+    }
 }