NIFI-15011 - FlowDifferenceFilters - handle splitRelationship() (#10341)
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index caf6ca1..436ef9a 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -18,8 +18,12 @@
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
@@ -36,10 +40,13 @@
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
@@ -96,7 +103,8 @@
|| isStaticPropertyRemoved(difference, flowManager)
|| isControllerServiceCreatedForNewProperty(difference, evaluatedContext)
|| isPropertyParameterizationRename(difference, evaluatedContext)
- || isPropertyRenameWithMatchingValue(difference, evaluatedContext);
+ || isPropertyRenameWithMatchingValue(difference, evaluatedContext)
+ || isSelectedRelationshipChangeForNewRelationship(difference, flowManager);
}
private static boolean isSensitivePropertyDueToGhosting(final FlowDifference difference, final FlowManager flowManager) {
@@ -441,6 +449,97 @@
return true;
}
+ private static boolean isSelectedRelationshipChangeForNewRelationship(final FlowDifference difference, final FlowManager flowManager) {
+ if (difference.getDifferenceType() != DifferenceType.SELECTED_RELATIONSHIPS_CHANGED) {
+ return false;
+ }
+
+ if (!(difference.getComponentA() instanceof VersionedConnection connectionA)) {
+ return false;
+ }
+
+ if (!(difference.getComponentB() instanceof InstantiatedVersionedConnection connectionB)) {
+ return false;
+ }
+
+ final Set<String> selectedA = new HashSet<>(replaceNull(connectionA.getSelectedRelationships(), Collections.emptySet()));
+ final Set<String> selectedB = new HashSet<>(replaceNull(connectionB.getSelectedRelationships(), Collections.emptySet()));
+
+ final Set<String> newlySelected = new HashSet<>(selectedB);
+ newlySelected.removeAll(selectedA);
+ if (newlySelected.isEmpty()) {
+ return false;
+ }
+
+ final Set<String> removedRelationships = new HashSet<>(selectedA);
+ removedRelationships.removeAll(selectedB);
+
+ if (flowManager == null) {
+ return false;
+ }
+
+ final String connectionInstanceId = connectionB.getInstanceIdentifier();
+ final String connectionGroupId = connectionB.getInstanceGroupId();
+ if (connectionInstanceId == null || connectionGroupId == null) {
+ return false;
+ }
+
+ final ProcessGroup processGroup = flowManager.getGroup(connectionGroupId);
+ if (processGroup == null) {
+ return false;
+ }
+
+ final Connection connection = processGroup.getConnection(connectionInstanceId);
+ if (connection == null) {
+ return false;
+ }
+
+ final Connectable source = connection.getSource();
+ if (source == null || source.getConnectableType() != ConnectableType.PROCESSOR) {
+ return false;
+ }
+
+ final ProcessorNode processorNode = flowManager.getProcessorNode(source.getIdentifier());
+ if (processorNode == null) {
+ return false;
+ }
+
+ final Processor processor = processorNode.getProcessor();
+ if (processor != null) {
+ final Class<?> processorClass = processor.getClass();
+ if (processorClass.isAnnotationPresent(DynamicRelationship.class)) {
+ return false;
+ }
+ }
+
+ for (final String relationshipName : newlySelected) {
+ final Relationship relationship = processorNode.getRelationship(relationshipName);
+ if (relationship == null) {
+ return false;
+ }
+
+ if (processorNode.isAutoTerminated(relationship)) {
+ return false;
+ }
+
+ final Set<Connection> relationshipConnections = replaceNull(processorNode.getConnections(relationship), Collections.emptySet());
+ for (final Connection relationshipConnection : relationshipConnections) {
+ if (!relationshipConnection.getIdentifier().equals(connection.getIdentifier())) {
+ return false;
+ }
+ }
+ }
+
+ for (final String removedRelationshipName : removedRelationships) {
+ final Relationship removedRelationship = processorNode.getRelationship(removedRelationshipName);
+ if (removedRelationship != null) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
private static <T> T replaceNull(final T value, final T replacement) {
return value == null ? replacement : value;
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
index cb2430f..c72b412 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
@@ -17,31 +17,40 @@
package org.apache.nifi.util;
import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.ComponentType;
import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardFlowDifference;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -357,6 +366,52 @@
}
@Test
+ public void testSelectedRelationshipChangeForNewRelationshipObservedAsEnvironmentalChange() {
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+ final Connection connection = Mockito.mock(Connection.class);
+ final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
+
+ final String connectionInstanceId = "connection-instance";
+ final String connectionGroupId = "group-id";
+ final String processorInstanceId = "processor-instance";
+ final String existingRelationshipName = "retry";
+ final String newRelationshipName = "restrictions changed";
+
+ final VersionedConnection connectionA = new VersionedConnection();
+ connectionA.setSelectedRelationships(Set.of(existingRelationshipName));
+
+ final InstantiatedVersionedConnection connectionB = new InstantiatedVersionedConnection(connectionInstanceId, connectionGroupId);
+ connectionB.setSelectedRelationships(Set.of(existingRelationshipName, newRelationshipName));
+
+ final FlowDifference difference = new StandardFlowDifference(
+ DifferenceType.SELECTED_RELATIONSHIPS_CHANGED,
+ connectionA,
+ connectionB,
+ null,
+ Set.of(existingRelationshipName),
+ Set.of(existingRelationshipName, newRelationshipName),
+ "Selected relationships updated");
+
+ final Relationship newRelationship = new Relationship.Builder().name(newRelationshipName).build();
+
+ Mockito.when(flowManager.getGroup(connectionGroupId)).thenReturn(processGroup);
+ Mockito.when(processGroup.getConnection(connectionInstanceId)).thenReturn(connection);
+ Mockito.when(connection.getIdentifier()).thenReturn(connectionInstanceId);
+ Mockito.when(connection.getSource()).thenReturn(processorNode);
+
+ Mockito.when(processorNode.getIdentifier()).thenReturn(processorInstanceId);
+ Mockito.when(processorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+ Mockito.when(flowManager.getProcessorNode(processorInstanceId)).thenReturn(processorNode);
+ Mockito.when(processorNode.getRelationship(newRelationshipName)).thenReturn(newRelationship);
+ Mockito.when(processorNode.isAutoTerminated(newRelationship)).thenReturn(false);
+ Mockito.when(processorNode.getProcessor()).thenReturn(new NonDynamicProcessor());
+ Mockito.when(processorNode.getConnections(newRelationship)).thenReturn(new HashSet<>(Set.of(connection)));
+
+ assertTrue(FlowDifferenceFilters.isEnvironmentalChange(difference, null, flowManager));
+ }
+
+ @Test
public void testPropertyRenameWithMatchingValueObservedAsEnvironmentalChange() {
final FlowManager flowManager = Mockito.mock(FlowManager.class);
final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
@@ -407,6 +462,154 @@
assertTrue(FlowDifferenceFilters.isEnvironmentalChange(propertyAdded, null, flowManager, context));
}
+ @Test
+ public void testSelectedRelationshipChangeNotEnvironmentalWhenRelationshipUsedElsewhere() {
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+ final Connection connection = Mockito.mock(Connection.class);
+ final Connection otherConnection = Mockito.mock(Connection.class);
+ final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
+
+ final String connectionInstanceId = "connection-instance";
+ final String otherConnectionId = "connection-other";
+ final String connectionGroupId = "group-id";
+ final String processorInstanceId = "processor-instance";
+ final String existingRelationshipName = "retry";
+ final String newRelationshipName = "restrictions changed";
+
+ final VersionedConnection connectionA = new VersionedConnection();
+ connectionA.setSelectedRelationships(Set.of(existingRelationshipName));
+
+ final InstantiatedVersionedConnection connectionB = new InstantiatedVersionedConnection(connectionInstanceId, connectionGroupId);
+ connectionB.setSelectedRelationships(Set.of(existingRelationshipName, newRelationshipName));
+
+ final FlowDifference difference = new StandardFlowDifference(
+ DifferenceType.SELECTED_RELATIONSHIPS_CHANGED,
+ connectionA,
+ connectionB,
+ null,
+ Set.of(existingRelationshipName),
+ Set.of(existingRelationshipName, newRelationshipName),
+ "Selected relationships updated");
+
+ final Relationship newRelationship = new Relationship.Builder().name(newRelationshipName).build();
+
+ Mockito.when(flowManager.getGroup(connectionGroupId)).thenReturn(processGroup);
+ Mockito.when(processGroup.getConnection(connectionInstanceId)).thenReturn(connection);
+ Mockito.when(connection.getIdentifier()).thenReturn(connectionInstanceId);
+ Mockito.when(connection.getSource()).thenReturn(processorNode);
+
+ Mockito.when(otherConnection.getIdentifier()).thenReturn(otherConnectionId);
+
+ Mockito.when(processorNode.getIdentifier()).thenReturn(processorInstanceId);
+ Mockito.when(processorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+ Mockito.when(flowManager.getProcessorNode(processorInstanceId)).thenReturn(processorNode);
+ Mockito.when(processorNode.getRelationship(newRelationshipName)).thenReturn(newRelationship);
+ Mockito.when(processorNode.isAutoTerminated(newRelationship)).thenReturn(false);
+ Mockito.when(processorNode.getProcessor()).thenReturn(new NonDynamicProcessor());
+ Mockito.when(processorNode.getConnections(newRelationship)).thenReturn(new HashSet<>(Set.of(connection, otherConnection)));
+
+ assertFalse(FlowDifferenceFilters.isEnvironmentalChange(difference, null, flowManager));
+ }
+
+ @Test
+ public void testSelectedRelationshipChangeNotEnvironmentalWhenProcessorHasDynamicRelationships() {
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+ final Connection connection = Mockito.mock(Connection.class);
+ final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
+
+ final String connectionInstanceId = "connection-instance";
+ final String connectionGroupId = "group-id";
+ final String processorInstanceId = "processor-instance";
+ final String existingRelationshipName = "retry";
+ final String newRelationshipName = "dynamic";
+
+ final VersionedConnection connectionA = new VersionedConnection();
+ connectionA.setSelectedRelationships(Set.of(existingRelationshipName));
+
+ final InstantiatedVersionedConnection connectionB = new InstantiatedVersionedConnection(connectionInstanceId, connectionGroupId);
+ connectionB.setSelectedRelationships(Set.of(existingRelationshipName, newRelationshipName));
+
+ final FlowDifference difference = new StandardFlowDifference(
+ DifferenceType.SELECTED_RELATIONSHIPS_CHANGED,
+ connectionA,
+ connectionB,
+ null,
+ Set.of(existingRelationshipName),
+ Set.of(existingRelationshipName, newRelationshipName),
+ "Selected relationships updated");
+
+ final Relationship newRelationship = new Relationship.Builder().name(newRelationshipName).build();
+
+ Mockito.when(flowManager.getGroup(connectionGroupId)).thenReturn(processGroup);
+ Mockito.when(processGroup.getConnection(connectionInstanceId)).thenReturn(connection);
+ Mockito.when(connection.getIdentifier()).thenReturn(connectionInstanceId);
+ Mockito.when(connection.getSource()).thenReturn(processorNode);
+
+ Mockito.when(processorNode.getIdentifier()).thenReturn(processorInstanceId);
+ Mockito.when(processorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+ Mockito.when(flowManager.getProcessorNode(processorInstanceId)).thenReturn(processorNode);
+ Mockito.when(processorNode.getRelationship(newRelationshipName)).thenReturn(newRelationship);
+ Mockito.when(processorNode.isAutoTerminated(newRelationship)).thenReturn(false);
+ Mockito.when(processorNode.getConnections(newRelationship)).thenReturn(new HashSet<>(Set.of(connection)));
+ Mockito.when(processorNode.getProcessor()).thenReturn(new DynamicRelationshipProcessor());
+
+ assertFalse(FlowDifferenceFilters.isEnvironmentalChange(difference, null, flowManager));
+ }
+
+ @Test
+ public void testSelectedRelationshipChangeWithRemovedRelationshipObservedAsEnvironmentalChange() {
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+ final Connection connection = Mockito.mock(Connection.class);
+ final ProcessorNode processorNode = Mockito.mock(ProcessorNode.class);
+
+ final String connectionInstanceId = "connection-instance";
+ final String connectionGroupId = "group-id";
+ final String processorInstanceId = "processor-instance";
+ final String oldRelationshipName = "old";
+ final String firstNewRelationshipName = "newA";
+ final String secondNewRelationshipName = "newB";
+
+ final VersionedConnection connectionA = new VersionedConnection();
+ connectionA.setSelectedRelationships(Set.of(oldRelationshipName));
+
+ final InstantiatedVersionedConnection connectionB = new InstantiatedVersionedConnection(connectionInstanceId, connectionGroupId);
+ connectionB.setSelectedRelationships(Set.of(firstNewRelationshipName, secondNewRelationshipName));
+
+ final FlowDifference difference = new StandardFlowDifference(
+ DifferenceType.SELECTED_RELATIONSHIPS_CHANGED,
+ connectionA,
+ connectionB,
+ null,
+ Set.of(oldRelationshipName),
+ Set.of(firstNewRelationshipName, secondNewRelationshipName),
+ "Selected relationships updated");
+
+ final Relationship firstNewRelationship = new Relationship.Builder().name(firstNewRelationshipName).build();
+ final Relationship secondNewRelationship = new Relationship.Builder().name(secondNewRelationshipName).build();
+
+ Mockito.when(flowManager.getGroup(connectionGroupId)).thenReturn(processGroup);
+ Mockito.when(processGroup.getConnection(connectionInstanceId)).thenReturn(connection);
+ Mockito.when(connection.getIdentifier()).thenReturn(connectionInstanceId);
+ Mockito.when(connection.getSource()).thenReturn(processorNode);
+
+ Mockito.when(processorNode.getIdentifier()).thenReturn(processorInstanceId);
+ Mockito.when(processorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+ Mockito.when(flowManager.getProcessorNode(processorInstanceId)).thenReturn(processorNode);
+ Mockito.when(processorNode.getProcessor()).thenReturn(new NonDynamicProcessor());
+ Mockito.when(processorNode.getRelationship(firstNewRelationshipName)).thenReturn(firstNewRelationship);
+ Mockito.when(processorNode.getRelationship(secondNewRelationshipName)).thenReturn(secondNewRelationship);
+ Mockito.when(processorNode.getRelationship(oldRelationshipName)).thenReturn(null);
+ Mockito.when(processorNode.isAutoTerminated(firstNewRelationship)).thenReturn(false);
+ Mockito.when(processorNode.isAutoTerminated(secondNewRelationship)).thenReturn(false);
+ Mockito.when(processorNode.getConnections(firstNewRelationship)).thenReturn(new HashSet<>(Set.of(connection)));
+ Mockito.when(processorNode.getConnections(secondNewRelationship)).thenReturn(new HashSet<>(Set.of(connection)));
+
+ assertTrue(FlowDifferenceFilters.isEnvironmentalChange(difference, null, flowManager));
+ }
+
@DynamicProperty(name = "Dynamic Property", value = "Value", description = "Allows dynamic properties")
private static class DynamicAnnotationProcessor extends AbstractProcessor {
@Override
@@ -414,4 +617,19 @@
// No-op for testing
}
}
+
+ private static class NonDynamicProcessor extends AbstractProcessor {
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ // No-op for testing
+ }
+ }
+
+ @DynamicRelationship(name = "dynamic", description = "dynamic")
+ private static class DynamicRelationshipProcessor extends AbstractProcessor {
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ // No-op for testing
+ }
+ }
}