blob: c44afb8a802df6e4b302009712e0589d88b5f09b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow.mapping;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.PortType;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFunnel;
import org.apache.nifi.registry.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class NiFiRegistryFlowMapperTest {
@Mock
private ExtensionManager extensionManager;
@Mock
private ControllerServiceProvider controllerServiceProvider;
@Mock
private FlowRegistryClient flowRegistryClient;
private NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(extensionManager);
private int counter = 1;
@Before
public void setup() {
final FlowRegistry flowRegistry = mock(FlowRegistry.class);
when(flowRegistryClient.getFlowRegistry(anyString())).thenReturn(flowRegistry);
when(flowRegistry.getURL()).thenReturn("url");
}
/**
* Test mapping versioned process group's parameter contexts excluding descendant versioned process groups
*/
@Test
public void testMapParameterContextsExcludingVersionedDescendants() {
final ProcessGroup innerInnerProcessGroup =
prepareProcessGroupWithParameterContext(Collections.emptyList(),
true, true);
final ProcessGroup innerProcessGroup =
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerInnerProcessGroup),
true, false);
final ProcessGroup processGroup =
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerProcessGroup),
false, false);
// first nesting should be traversed because child is not version controlled, but deeper nesting should be ignored
// because map versioned descendants indicator is false
final Map<String, VersionedParameterContext> versionedParameterContexts =
flowMapper.mapParameterContexts(processGroup, false);
// verify single parameter context
assertEquals(1, versionedParameterContexts.size());
final String expectedName = innerProcessGroup.getParameterContext().getName();
verifyParameterContext(innerProcessGroup.getParameterContext(), versionedParameterContexts.get(expectedName));
}
/**
* Test mapping nested process group's parameter contexts
*/
@Test
public void testMapNestedParameterContexts() {
final ProcessGroup innerInnerProcessGroup =
prepareProcessGroupWithParameterContext(Collections.emptyList(),
true, true);
final ProcessGroup innerProcessGroup =
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerInnerProcessGroup),
false, true);
final ProcessGroup processGroup =
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerProcessGroup),
true, true);
// include nested parameter contexts even though they are version controlled because map descendant indicator is true
final Map<String, VersionedParameterContext> versionedParameterContexts =
flowMapper.mapParameterContexts(processGroup, true);
// verify parameter contexts
assertEquals(2, versionedParameterContexts.size());
final String expectedName1 = processGroup.getParameterContext().getName();
final String expectedName2 = innerInnerProcessGroup.getParameterContext().getName();
verifyParameterContext(processGroup.getParameterContext(), versionedParameterContexts.get(expectedName1));
verifyParameterContext(innerInnerProcessGroup.getParameterContext(), versionedParameterContexts.get(expectedName2));
}
/**
* Test mapping a versioned ProcessGroup model to a versioned VersionedProcessGroup excluding descendant versioned flows.
* VersionControlInformation should be mapped to the versioned inner process group instead of the group contents
*/
@Test
public void testMapVersionedProcessGroupsExcludingVersionedDescendants() {
// prepare a versioned process group with a nested versioned process group, each with 1 processor
final ProcessGroup innerProcessGroup =
prepareProcessGroup(1,false, false, false,
false, false,null,
false, true, Collections.emptyList());
final ProcessGroup processGroup =
prepareProcessGroup(1,false,false, false,
false, false, null,
false, true, Lists.newArrayList(innerProcessGroup));
final List<ProcessGroup> allProcessGroups = Lists.newArrayList(innerProcessGroup);
when(processGroup.findAllProcessGroups()).thenReturn(allProcessGroups);
// perform the mapping, excluding descendant versioned flows
final InstantiatedVersionedProcessGroup versionedProcessGroup =
flowMapper.mapProcessGroup(processGroup, controllerServiceProvider, flowRegistryClient,
false);
final VersionedProcessGroup innerVersionedProcessGroup =
versionedProcessGroup.getProcessGroups().iterator().next();
// verify root versioned process group contents only
verifyVersionedProcessGroup(processGroup, versionedProcessGroup,false,false);
// verify versioned descendant is present with VersionControlInfo only
verifyVersionedProcessGroup(innerProcessGroup, innerVersionedProcessGroup,true,false);
}
/**
* Test mapping a versioned ProcessGroup model to a non-versioned VersionedProcessGroup. Version info is ignored.
* Most elements are exercised here... including labels, ports, processors, connections, funnels, nested process groups,
* remote process groups, congtroller services, external controller service references and variable registries.
*/
@Test
public void testMapNonVersionedProcessGroups() {
// create a controller service with a different process group id so it's treated as external
final ControllerServiceNode externalControllerServiceNode = prepareControllerService(UUID.randomUUID().toString());
// prepare a process group with nested process groups
final ProcessGroup innerInnerProcessGroup =
prepareProcessGroup(0,false, true, false,
true, false,null,
true, false, Collections.emptyList());
final ProcessGroup innerProcessGroup =
prepareProcessGroup(1,true, false, false,
true, true, externalControllerServiceNode,
true, true, Lists.newArrayList(innerInnerProcessGroup));
final ProcessGroup processGroup =
prepareProcessGroup(2,false,false, true,
false, true, null,
false, true, Lists.newArrayList(innerProcessGroup));
final List<ProcessGroup> allProcessGroups = Lists.newArrayList(innerProcessGroup, innerInnerProcessGroup);
when(processGroup.findAllProcessGroups()).thenReturn(allProcessGroups);
// perform the mapping
final InstantiatedVersionedProcessGroup versionedProcessGroup =
flowMapper.mapNonVersionedProcessGroup(processGroup, controllerServiceProvider);
// recursively verify versioned process group contents
verifyVersionedProcessGroup(processGroup, versionedProcessGroup, false,true);
// verify external controller service reference
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences =
versionedProcessGroup.getExternalControllerServiceReferences();
final String expectedExternalControllerServiceReferenceKey = flowMapper.getGroupId(externalControllerServiceNode.getIdentifier());
final ExternalControllerServiceReference externalControllerServiceReference =
externalControllerServiceReferences.get(expectedExternalControllerServiceReferenceKey);
assertNotNull(externalControllerServiceReference);
assertEquals(expectedExternalControllerServiceReferenceKey, externalControllerServiceReference.getIdentifier());
assertEquals(externalControllerServiceNode.getName(), externalControllerServiceReference.getName());
}
private ProcessGroup prepareProcessGroupWithParameterContext(final List<ProcessGroup> childProcessGroups,
final boolean includeParameterContext,
final boolean isVersionControlled) {
final ProcessGroup processGroup = mock(ProcessGroup.class);
if (includeParameterContext) {
final ParameterContext parameterContext = mock(ParameterContext.class);
when(processGroup.getParameterContext()).thenReturn(parameterContext);
when(parameterContext.getName()).thenReturn("context" + (counter++));
final Map<ParameterDescriptor, Parameter> parametersMap = Maps.newHashMap();
when(parameterContext.getParameters()).thenReturn(parametersMap);
addParameter(parametersMap, "value" + (counter++), false);
addParameter(parametersMap, "value" + (counter++), true);
addParameter(parametersMap, null, true);
}
if (isVersionControlled) {
when(processGroup.getVersionControlInformation()).thenReturn(mock(VersionControlInformation.class));
}
when(processGroup.getProcessGroups()).thenReturn(Sets.newLinkedHashSet(childProcessGroups));
return processGroup;
}
private void addParameter(final Map<ParameterDescriptor, Parameter> parametersMap, final String value, final boolean isSensitive) {
final ParameterDescriptor parameterDescriptor =
new ParameterDescriptor.Builder().name("param" + (counter++)).description("description" + (counter++)).sensitive(isSensitive).build();
final Parameter parameter = mock(Parameter.class);
when(parameter.getDescriptor()).thenReturn(parameterDescriptor);
when(parameter.getValue()).thenReturn(value);
parametersMap.put(parameterDescriptor, parameter);
}
private void verifyParameterContext(final ParameterContext parameterContext, final VersionedParameterContext versionedParameterContext) {
assertEquals(parameterContext.getName(), versionedParameterContext.getName());
final Collection<Parameter> parameters = parameterContext.getParameters().values();
final Set<VersionedParameter> versionedParameters = versionedParameterContext.getParameters();
// parameter order is not deterministic - use unique names to map up matching parameters
final Iterator<Parameter> parametersIterator = parameters.iterator();
while (parametersIterator.hasNext()) {
final Parameter parameter = parametersIterator.next();
final Iterator<VersionedParameter> versionedParameterIterator = versionedParameters.iterator();
while (versionedParameterIterator.hasNext()) {
final VersionedParameter versionedParameter = versionedParameterIterator.next();
if (versionedParameter.getName().equals(parameter.getDescriptor().getName())) {
verifyParameter(versionedParameter, parameter);
versionedParameterIterator.remove();
break;
}
}
}
assertTrue("Failed to match parameters by unique name", versionedParameters.isEmpty());
}
private void verifyParameter(final VersionedParameter versionedParameter, final Parameter parameter) {
final ParameterDescriptor parameterDescriptor = parameter.getDescriptor();
assertEquals(parameterDescriptor.getName(), versionedParameter.getName());
assertEquals(parameterDescriptor.getDescription(), versionedParameter.getDescription());
assertEquals(parameterDescriptor.isSensitive(), versionedParameter.isSensitive());
if (parameterDescriptor.isSensitive()) {
// verify parameter value is null for sensitive parameters
assertNull(versionedParameter.getValue());
} else {
assertEquals(parameter.getValue(), versionedParameter.getValue());
}
}
private ProcessGroup prepareProcessGroup(final int numProcessors, final boolean includeFunnel,final boolean includePorts,
final boolean includeLabels, final boolean includeVariableRegistry,
final boolean includeControllerService,
final ControllerServiceNode externalControllerServiceNode,
final boolean includeRemoteProcessGroup, final boolean includeVersionControlInfo,
final List<ProcessGroup> childProcessGroups) {
final String processGroupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroup.getIdentifier()).thenReturn(processGroupId);
when(processGroup.getProcessGroupIdentifier()).thenReturn(processGroupId);
when(processGroup.getName()).thenReturn("group"+(counter++));
when(processGroup.getComments()).thenReturn("comments"+(counter++));
when(processGroup.getPosition()).thenReturn(new Position(counter++, counter++));
final ParameterContext parameterContext = mock(ParameterContext.class);
when(processGroup.getParameterContext()).thenReturn(parameterContext);
when(parameterContext.getName()).thenReturn("context"+(counter++));
when(processGroup.getFlowFileConcurrency()).thenReturn(FlowFileConcurrency.UNBOUNDED);
when(processGroup.getFlowFileOutboundPolicy()).thenReturn(FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE);
// prep funnels
final Set<Funnel> funnels = Sets.newHashSet();
if (includeFunnel) {
funnels.add(prepareFunnel(processGroupId));
}
when(processGroup.getFunnels()).thenReturn(funnels);
// prep ports
final Set<Port> inputPorts = Sets.newHashSet();
final Set<Port> outputPorts = Sets.newHashSet();
if (includePorts) {
inputPorts.add(preparePort(processGroupId, PortType.INPUT_PORT));
outputPorts.add(preparePort(processGroupId, PortType.OUTPUT_PORT));
}
when(processGroup.getInputPorts()).thenReturn(inputPorts);
when(processGroup.getOutputPorts()).thenReturn(outputPorts);
// prep labels
final Set<Label> labels = Sets.newHashSet();
if (includeLabels) {
labels.add(prepareLabel(processGroupId));
}
when(processGroup.getLabels()).thenReturn(labels);
// prep connections and processors
final Set<ProcessorNode> processorNodes = Sets.newLinkedHashSet();
final Set<Connection> connections = Sets.newHashSet();
if (numProcessors == 2) {
// 2 processors connected together
final ProcessorNode processorNode1 = prepareProcessor(processGroup, externalControllerServiceNode);
final ProcessorNode processorNode2 = prepareProcessor(processGroup, externalControllerServiceNode);
processorNodes.add(processorNode1);
processorNodes.add(processorNode2);
connections.add(prepareConnection(processorNode1, processorNode2, processGroup));
} else if (numProcessors == 1) {
// a processor connected to itself
final ProcessorNode processorNode1 = prepareProcessor(processGroup, externalControllerServiceNode);
processorNodes.add(processorNode1);
connections.add(prepareConnection(processorNode1, processorNode1, processGroup));
}
when(processGroup.getProcessors()).thenReturn(processorNodes);
when(processGroup.getConnections()).thenReturn(connections);
// prep controller services
final Set<ControllerServiceNode> controllerServiceNodes = Sets.newHashSet();
if (includeControllerService) {
controllerServiceNodes.add(prepareControllerService(processGroupId));
}
when(processGroup.getControllerServices(false)).thenReturn(controllerServiceNodes);
// prep variable registry
final ComponentVariableRegistry componentVariableRegistry = mock(ComponentVariableRegistry.class);
when(processGroup.getVariableRegistry()).thenReturn(componentVariableRegistry);
final Map<VariableDescriptor, String> registryVariableMap = Maps.newHashMap();
if (includeVariableRegistry) {
registryVariableMap.putAll(prepareVariableRegistry());
}
when(componentVariableRegistry.getVariableMap()).thenReturn(registryVariableMap);
// prepare remote process group
final Set<RemoteProcessGroup> remoteProcessGroups = Sets.newHashSet();
if (includeRemoteProcessGroup) {
remoteProcessGroups.add(prepareRemoteProcessGroup(processGroupId));
}
when(processGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
// prep version control info
if (includeVersionControlInfo) {
final VersionControlInformation versionControlInformation = prepareVersionControlInfo();
when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation);
}
// prep nested process groups
when(processGroup.getProcessGroups()).thenReturn(Sets.newLinkedHashSet(childProcessGroups));
return processGroup;
}
private Funnel prepareFunnel(final String processGroupId) {
final Funnel funnel = mock(Funnel.class);
prepareComponentAuthorizable(funnel, processGroupId);
preparePositionable(funnel);
return funnel;
}
private Port preparePort(final String processGroupId, final PortType portType) {
final Port port = mock(Port.class);
prepareComponentAuthorizable(port, processGroupId);
preparePositionable(port);
prepareConnectable(port, ConnectableType.valueOf(portType.name()));
return port;
}
private Label prepareLabel(final String processGroupId) {
final Label label = mock(Label.class);
prepareComponentAuthorizable(label, processGroupId);
preparePositionable(label);
when(label.getSize()).thenReturn(new Size(counter++, counter++));
return label;
}
private ProcessorNode prepareProcessor(final ProcessGroup processGroup, final ControllerServiceNode externalControllerServiceNode) {
final ProcessorNode processorNode = mock(ProcessorNode.class);
prepareComponentAuthorizable(processorNode, processGroup.getIdentifier());
preparePositionable(processorNode);
prepareConnectable(processorNode, ConnectableType.PROCESSOR);
when(processorNode.getProcessGroup()).thenReturn(processGroup);
when(processorNode.getAutoTerminatedRelationships()).thenReturn(Collections.emptySet());
when(processorNode.getBulletinLevel()).thenReturn(LogLevel.INFO);
when(processorNode.getExecutionNode()).thenReturn(ExecutionNode.ALL);
when(processorNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN);
when(processorNode.getBundleCoordinate()).thenReturn(mock(BundleCoordinate.class));
final String rawPropertyValue = "propValue";
final PropertyDescriptor.Builder propertyDescriptorBuilder =
new PropertyDescriptor.Builder().name("propName").sensitive(false).displayName("displayName");
if (externalControllerServiceNode != null) {
propertyDescriptorBuilder.identifiesControllerService(ControllerService.class);
when(controllerServiceProvider.getControllerServiceNode(rawPropertyValue)).thenReturn(externalControllerServiceNode);
}
final PropertyDescriptor propertyDescriptor = propertyDescriptorBuilder.build();
final PropertyConfiguration propertyConfiguration = mock(PropertyConfiguration.class);
final Map<PropertyDescriptor, PropertyConfiguration> properties = Maps.newHashMap();
properties.put(propertyDescriptor, propertyConfiguration);
when(processorNode.getProperties()).thenReturn(properties);
when(processorNode.getProperty(propertyDescriptor)).thenReturn(propertyConfiguration);
when(propertyConfiguration.getRawValue()).thenReturn(rawPropertyValue);
return processorNode;
}
private Connection prepareConnection(final ProcessorNode sourceProcessorNode, final ProcessorNode destProcessorNode,
final ProcessGroup processGroup) {
final Connection connection = mock(Connection.class);
when(connection.getIdentifier()).thenReturn(UUID.randomUUID().toString());
when(connection.getProcessGroup()).thenReturn(processGroup);
when(connection.getBendPoints()).thenReturn(Lists.newArrayList(new Position(counter++, counter++)));
when(connection.getRelationships()).thenReturn(Lists.newArrayList());
final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
when(flowFileQueue.getPriorities()).thenReturn(Collections.emptyList());
when(flowFileQueue.getLoadBalanceStrategy()).thenReturn(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE);
when(flowFileQueue.getLoadBalanceCompression()).thenReturn(LoadBalanceCompression.DO_NOT_COMPRESS);
when(sourceProcessorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
when(connection.getSource()).thenReturn(sourceProcessorNode);
when(destProcessorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
when(connection.getDestination()).thenReturn(destProcessorNode);
return connection;
}
private Map<VariableDescriptor, String> prepareVariableRegistry() {
final VariableDescriptor variableDescriptor =
new VariableDescriptor.Builder("variable"+(counter++)).build();
final Map<VariableDescriptor, String> variableRegistryMap = Maps.newHashMap();
variableRegistryMap.put(variableDescriptor, "value"+(counter++));
return variableRegistryMap;
}
private ControllerServiceNode prepareControllerService(final String processGroupId) {
final ControllerServiceNode controllerServiceNode = mock(ControllerServiceNode.class);
prepareComponentAuthorizable(controllerServiceNode, processGroupId);
when(controllerServiceNode.getName()).thenReturn("service" + (counter++));
when(controllerServiceNode.getBundleCoordinate()).thenReturn(mock(BundleCoordinate.class));
when(controllerServiceNode.getControllerServiceImplementation()).thenReturn(mock(ControllerService.class));
when(controllerServiceNode.getProperties()).thenReturn(Collections.emptyMap());
return controllerServiceNode;
}
private RemoteProcessGroup prepareRemoteProcessGroup(final String processGroupId) {
final RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
prepareComponentAuthorizable(remoteProcessGroup, processGroupId);
preparePositionable(remoteProcessGroup);
when(remoteProcessGroup.getName()).thenReturn("remote" + (counter++));
when(remoteProcessGroup.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
final RemoteGroupPort remoteGroupInputPort = prepareRemoteGroupPort(remoteProcessGroup);
when(remoteProcessGroup.getInputPorts()).thenReturn(Sets.newHashSet(remoteGroupInputPort));
final RemoteGroupPort remoteGroupOutputPort = prepareRemoteGroupPort(remoteProcessGroup);
when(remoteProcessGroup.getOutputPorts()).thenReturn(Sets.newHashSet(remoteGroupOutputPort));
return remoteProcessGroup;
}
private RemoteGroupPort prepareRemoteGroupPort(final RemoteProcessGroup remoteProcessGroup) {
final RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class);
prepareComponentAuthorizable(remoteGroupPort, remoteProcessGroup.getIdentifier());
when(remoteGroupPort.getName()).thenReturn("remotePort" + (counter++));
when(remoteGroupPort.getRemoteProcessGroup()).thenReturn(remoteProcessGroup);
return remoteGroupPort;
}
private VersionControlInformation prepareVersionControlInfo() {
final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class);
when(versionControlInformation.getRegistryIdentifier()).thenReturn(UUID.randomUUID().toString());
when(versionControlInformation.getBucketIdentifier()).thenReturn(UUID.randomUUID().toString());
when(versionControlInformation.getFlowIdentifier()).thenReturn(UUID.randomUUID().toString());
when(versionControlInformation.getVersion()).thenReturn(counter++);
return versionControlInformation;
}
private void prepareComponentAuthorizable(final ComponentAuthorizable authorizable, final String processGroupId) {
when(authorizable.getIdentifier()).thenReturn(UUID.randomUUID().toString());
when(authorizable.getProcessGroupIdentifier()).thenReturn(processGroupId);
}
private void preparePositionable(final Positionable positionable) {
when(positionable.getPosition()).thenReturn(new Position(counter++, counter++));
}
private void prepareConnectable(final Connectable connectable, final ConnectableType connectableType) {
when(connectable.getName()).thenReturn("connectable" + (counter++));
when(connectable.getConnectableType()).thenReturn(connectableType);
}
/**
* Verify the given VersionedProcessGroup was correctly mapped from the given ProcessGroup, including child groups
* when boolean indicator is true. If expectVersionControlInfo boolean indicator is true, the mapped group should
* only contain VersionControlInfo, otherwise it should instead contain its full contents.
*/
private void verifyVersionedProcessGroup(final ProcessGroup processGroup, final VersionedProcessGroup versionedProcessGroup,
final boolean expectVersionControlInfo, final boolean verifyChildProcessGroups) {
final String expectedGroupIdentifier = flowMapper.getGroupId(processGroup.getProcessGroupIdentifier());
// verify process group fields
assertEquals(processGroup.getName(), versionedProcessGroup.getName());
assertEquals(flowMapper.getGroupId(processGroup.getIdentifier()), versionedProcessGroup.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedProcessGroup.getGroupIdentifier());
assertEquals(processGroup.getComments(), versionedProcessGroup.getComments());
assertEquals(processGroup.getPosition().getX(), versionedProcessGroup.getPosition().getX(), 0);
assertEquals(processGroup.getPosition().getY(), versionedProcessGroup.getPosition().getY(), 0);
assertEquals(processGroup.getFlowFileConcurrency().name(), versionedProcessGroup.getFlowFileConcurrency());
assertEquals(processGroup.getFlowFileOutboundPolicy().name(), versionedProcessGroup.getFlowFileOutboundPolicy());
final String expectedParameterContextName =
(processGroup.getParameterContext() != null ? processGroup.getParameterContext().getName() : null);
assertEquals(expectedParameterContextName, versionedProcessGroup.getParameterContextName());
// verify either version control info or full group contents
if (expectVersionControlInfo) {
final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
final VersionedFlowCoordinates versionedFlowCoordinates = versionedProcessGroup.getVersionedFlowCoordinates();
assertNotNull(versionedFlowCoordinates.getRegistryUrl());
assertEquals(versionControlInfo.getBucketIdentifier(), versionedFlowCoordinates.getBucketId());
assertEquals(versionControlInfo.getFlowIdentifier(), versionedFlowCoordinates.getFlowId());
assertEquals(versionControlInfo.getVersion(), versionedFlowCoordinates.getVersion());
} else {
assertNull(versionedProcessGroup.getVersionedFlowCoordinates());
// verify funnels
final Set<Funnel> funnels = processGroup.getFunnels();
final Set<VersionedFunnel> versionedFunnels = versionedProcessGroup.getFunnels();
if (funnels.isEmpty()) {
assertTrue(versionedFunnels.isEmpty());
} else {
final Funnel funnel = funnels.iterator().next();
final VersionedFunnel versionedFunnel = versionedFunnels.iterator().next();
assertEquals(flowMapper.getGroupId(funnel.getIdentifier()), versionedFunnel.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedFunnel.getGroupIdentifier());
assertEquals(funnel.getPosition().getX(), versionedFunnel.getPosition().getX(), 0);
assertEquals(funnel.getPosition().getY(), versionedFunnel.getPosition().getY(), 0);
}
// verify ports
verifyPorts(processGroup.getInputPorts(), versionedProcessGroup.getInputPorts(), PortType.INPUT_PORT, expectedGroupIdentifier);
verifyPorts(processGroup.getOutputPorts(), versionedProcessGroup.getOutputPorts(), PortType.OUTPUT_PORT, expectedGroupIdentifier);
// verify labels
final Set<Label> labels = processGroup.getLabels();
final Set<VersionedLabel> versionedLabels = versionedProcessGroup.getLabels();
if (labels.isEmpty()) {
assertTrue(versionedLabels.isEmpty());
} else {
final Label label = labels.iterator().next();
final VersionedLabel versionedLabel = versionedLabels.iterator().next();
assertEquals(flowMapper.getGroupId(label.getIdentifier()), versionedLabel.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedLabel.getGroupIdentifier());
assertEquals(label.getPosition().getX(), versionedLabel.getPosition().getX(), 0);
assertEquals(label.getPosition().getY(), versionedLabel.getPosition().getY(), 0);
assertEquals(label.getSize().getHeight(), versionedLabel.getHeight(), 0);
assertEquals(label.getSize().getWidth(), versionedLabel.getWidth(), 0);
}
// verify processors
final Collection<ProcessorNode> processorNodes = processGroup.getProcessors();
final Set<VersionedProcessor> versionedProcessors = versionedProcessGroup.getProcessors();
// first verify the number of processors matches
assertEquals(processorNodes.size(), versionedProcessors.size());
// processor order is not deterministic - use unique names to map up matching processors
final Iterator<ProcessorNode> processorNodesIterator = processorNodes.iterator();
while (processorNodesIterator.hasNext()) {
final ProcessorNode processorNode = processorNodesIterator.next();
final Iterator<VersionedProcessor> versionedProcessorIterator = versionedProcessors.iterator();
while (versionedProcessorIterator.hasNext()) {
final VersionedProcessor versionedProcessor = versionedProcessorIterator.next();
if (versionedProcessor.getName().equals(processorNode.getName())) {
verifyProcessor(versionedProcessor, processorNode, expectedGroupIdentifier);
versionedProcessorIterator.remove();
break;
}
}
}
assertTrue("Failed to match processors by unique name", versionedProcessors.isEmpty());
// verify connections
final Set<Connection> connections = processGroup.getConnections();
final Set<VersionedConnection> versionedConnections = versionedProcessGroup.getConnections();
if (connections.isEmpty()) {
assertTrue(versionedConnections.isEmpty());
} else {
final Connection connection = connections.iterator().next();
final VersionedConnection versionedConnection = versionedConnections.iterator().next();
assertEquals(connection.getName(), versionedConnection.getName());
assertEquals(flowMapper.getGroupId(connection.getIdentifier()), versionedConnection.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedConnection.getGroupIdentifier());
assertEquals(connection.getBendPoints().get(0).getX(), versionedConnection.getBends().get(0).getX(), 0);
assertEquals(connection.getBendPoints().get(0).getY(), versionedConnection.getBends().get(0).getY(), 0);
assertEquals(flowMapper.getGroupId(connection.getSource().getIdentifier()), versionedConnection.getSource().getId());
assertEquals(expectedGroupIdentifier, versionedConnection.getSource().getGroupId());
assertEquals(connection.getSource().getName(), versionedConnection.getSource().getName());
assertEquals(connection.getSource().getConnectableType().name(), versionedConnection.getSource().getType().name());
assertEquals(flowMapper.getGroupId(connection.getDestination().getIdentifier()), versionedConnection.getDestination().getId());
assertEquals(expectedGroupIdentifier, versionedConnection.getDestination().getGroupId());
assertEquals(connection.getDestination().getName(), versionedConnection.getDestination().getName());
assertEquals(connection.getDestination().getConnectableType().name(), versionedConnection.getDestination().getType().name());
}
// verify controller services
final Set<ControllerServiceNode> controllerServiceNodes = processGroup.getControllerServices(false);
final Set<VersionedControllerService> versionedControllerServices = versionedProcessGroup.getControllerServices();
if (controllerServiceNodes.isEmpty()) {
assertTrue(versionedControllerServices.isEmpty());
} else {
final ControllerServiceNode controllerServiceNode = controllerServiceNodes.iterator().next();
final VersionedControllerService versionedControllerService = versionedControllerServices.iterator().next();
assertEquals(controllerServiceNode.getName(), versionedControllerService.getName());
assertEquals(flowMapper.getGroupId(controllerServiceNode.getIdentifier()), versionedControllerService.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedControllerService.getGroupIdentifier());
}
// verify variables
final Map<VariableDescriptor, String> variableRegistryMap = processGroup.getVariableRegistry().getVariableMap();
final Map<String, String> versionedVariableMap = versionedProcessGroup.getVariables();
if (variableRegistryMap.isEmpty()) {
assertTrue(versionedVariableMap.isEmpty());
} else {
final VariableDescriptor variableRegistryKey = variableRegistryMap.keySet().iterator().next();
assertEquals(variableRegistryMap.get(variableRegistryKey), versionedVariableMap.get(variableRegistryKey.getName()));
}
// verify remote process group(s)
final Set<RemoteProcessGroup> remoteProcessGroups = processGroup.getRemoteProcessGroups();
final Set<VersionedRemoteProcessGroup> versionedRemoteProcessGroups = versionedProcessGroup.getRemoteProcessGroups();
if (remoteProcessGroups.isEmpty()) {
assertTrue(versionedRemoteProcessGroups.isEmpty());
} else {
final RemoteProcessGroup remoteProcessGroup = remoteProcessGroups.iterator().next();
final VersionedRemoteProcessGroup versionedRemoteProcessGroup = versionedRemoteProcessGroups.iterator().next();
assertEquals(remoteProcessGroup.getName(), versionedRemoteProcessGroup.getName());
assertEquals(flowMapper.getGroupId(remoteProcessGroup.getIdentifier()), versionedRemoteProcessGroup.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedRemoteProcessGroup.getGroupIdentifier());
assertEquals(remoteProcessGroup.getPosition().getX(), versionedRemoteProcessGroup.getPosition().getX(), 0);
assertEquals(remoteProcessGroup.getPosition().getY(), versionedRemoteProcessGroup.getPosition().getY(), 0);
// verify remote ports
final String expectedPortGroupIdentifier = flowMapper.getGroupId(remoteProcessGroup.getIdentifier());
verifyRemotePorts(remoteProcessGroup.getInputPorts(), versionedRemoteProcessGroup.getInputPorts(),
ComponentType.REMOTE_INPUT_PORT, expectedPortGroupIdentifier);
verifyRemotePorts(remoteProcessGroup.getOutputPorts(), versionedRemoteProcessGroup.getOutputPorts(),
ComponentType.REMOTE_OUTPUT_PORT, expectedPortGroupIdentifier);
}
if (verifyChildProcessGroups) {
// recurse to verify inner process group(s)
final Set<ProcessGroup> innerProcessGroups = processGroup.getProcessGroups();
final Set<VersionedProcessGroup> innerVersionedProcessGroups = versionedProcessGroup.getProcessGroups();
if (innerProcessGroups.isEmpty()) {
assertTrue(innerVersionedProcessGroups.isEmpty());
} else {
final ProcessGroup innerProcessGroup = innerProcessGroups.iterator().next();
final VersionedProcessGroup innerVersionedProcessGroup = innerVersionedProcessGroups.iterator().next();
verifyVersionedProcessGroup(innerProcessGroup, innerVersionedProcessGroup, expectVersionControlInfo,
verifyChildProcessGroups);
}
}
}
}
private void verifyPorts(final Set<Port> ports, final Set<VersionedPort> versionedPorts, final PortType portType,
final String expectedGroupIdentifier) {
if (ports.isEmpty()) {
assertTrue(versionedPorts.isEmpty());
} else {
final Port port = ports.iterator().next();
final VersionedPort versionedPort = versionedPorts.iterator().next();
assertEquals(flowMapper.getGroupId(port.getIdentifier()), versionedPort.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedPort.getGroupIdentifier());
assertEquals(port.getPosition().getX(), versionedPort.getPosition().getX(), 0);
assertEquals(port.getPosition().getY(), versionedPort.getPosition().getY(), 0);
assertEquals(port.getName(), versionedPort.getName());
assertEquals(portType, versionedPort.getType());
}
}
private void verifyRemotePorts(final Set<RemoteGroupPort> remotePorts,
final Set<VersionedRemoteGroupPort> versionedRemotePorts,
final ComponentType componentType, final String expectedPortGroupIdentifier) {
if (remotePorts.isEmpty()) {
assertTrue(versionedRemotePorts.isEmpty());
} else {
final RemoteGroupPort remotePort = remotePorts.iterator().next();
final VersionedRemoteGroupPort versionedRemotePort = versionedRemotePorts.iterator().next();
assertEquals(flowMapper.getGroupId(remotePort.getIdentifier()), versionedRemotePort.getIdentifier());
assertEquals(expectedPortGroupIdentifier, versionedRemotePort.getGroupIdentifier());
assertEquals(remotePort.getName(), versionedRemotePort.getName());
assertEquals(componentType, versionedRemotePort.getComponentType());
}
}
private void verifyProcessor(final VersionedProcessor versionedProcessor, final ProcessorNode processorNode,
final String expectedGroupIdentifier) {
assertEquals(processorNode.getName(), versionedProcessor.getName());
assertEquals(flowMapper.getGroupId(processorNode.getIdentifier()), versionedProcessor.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedProcessor.getGroupIdentifier());
assertEquals(processorNode.getPosition().getX(), versionedProcessor.getPosition().getX(), 0);
assertEquals(processorNode.getPosition().getY(), versionedProcessor.getPosition().getY(), 0);
final PropertyDescriptor propertyDescriptor = processorNode.getProperties().keySet().iterator().next();
final VersionedPropertyDescriptor versionedPropertyDescriptor =
versionedProcessor.getPropertyDescriptors().get(propertyDescriptor.getName());
assertTrue(versionedProcessor.getProperties().containsKey(propertyDescriptor.getName()));
assertNotNull(versionedPropertyDescriptor);
assertEquals(propertyDescriptor.getName(), versionedPropertyDescriptor.getName());
assertEquals(propertyDescriptor.getDisplayName(), versionedPropertyDescriptor.getDisplayName());
assertEquals(propertyDescriptor.isSensitive(), versionedPropertyDescriptor.isSensitive());
}
}