| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.registry.flow.mapping; |
| |
| import org.apache.commons.lang3.ClassUtils; |
| import org.apache.nifi.bundle.BundleCoordinate; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.resource.ResourceCardinality; |
| import org.apache.nifi.components.resource.ResourceDefinition; |
| import org.apache.nifi.connectable.Connectable; |
| import org.apache.nifi.connectable.Connection; |
| import org.apache.nifi.connectable.Funnel; |
| import org.apache.nifi.connectable.Port; |
| import org.apache.nifi.controller.ComponentNode; |
| import org.apache.nifi.controller.ControllerService; |
| import org.apache.nifi.controller.ProcessorNode; |
| import org.apache.nifi.controller.PropertyConfiguration; |
| import org.apache.nifi.controller.ScheduledState; |
| import org.apache.nifi.controller.label.Label; |
| import org.apache.nifi.controller.queue.FlowFileQueue; |
| import org.apache.nifi.controller.service.ControllerServiceNode; |
| import org.apache.nifi.controller.service.ControllerServiceProvider; |
| import org.apache.nifi.groups.ProcessGroup; |
| import org.apache.nifi.groups.RemoteProcessGroup; |
| import org.apache.nifi.nar.ExtensionManager; |
| import org.apache.nifi.parameter.Parameter; |
| import org.apache.nifi.parameter.ParameterContext; |
| import org.apache.nifi.parameter.ParameterDescriptor; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.registry.VariableDescriptor; |
| import org.apache.nifi.registry.flow.BatchSize; |
| import org.apache.nifi.registry.flow.Bundle; |
| import org.apache.nifi.registry.flow.ComponentType; |
| import org.apache.nifi.registry.flow.ConnectableComponent; |
| import org.apache.nifi.registry.flow.ConnectableComponentType; |
| import org.apache.nifi.registry.flow.ControllerServiceAPI; |
| import org.apache.nifi.registry.flow.ExternalControllerServiceReference; |
| import org.apache.nifi.registry.flow.FlowRegistry; |
| import org.apache.nifi.registry.flow.FlowRegistryClient; |
| import org.apache.nifi.registry.flow.PortType; |
| import org.apache.nifi.registry.flow.Position; |
| import org.apache.nifi.registry.flow.VersionControlInformation; |
| import org.apache.nifi.registry.flow.VersionedConnection; |
| import org.apache.nifi.registry.flow.VersionedControllerService; |
| import org.apache.nifi.registry.flow.VersionedFlowCoordinates; |
| import org.apache.nifi.registry.flow.VersionedFunnel; |
| import org.apache.nifi.registry.flow.VersionedLabel; |
| import org.apache.nifi.registry.flow.VersionedParameter; |
| import org.apache.nifi.registry.flow.VersionedParameterContext; |
| import org.apache.nifi.registry.flow.VersionedPort; |
| import org.apache.nifi.registry.flow.VersionedProcessGroup; |
| import org.apache.nifi.registry.flow.VersionedProcessor; |
| import org.apache.nifi.registry.flow.VersionedPropertyDescriptor; |
| import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; |
| import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; |
| import org.apache.nifi.registry.flow.VersionedResourceCardinality; |
| import org.apache.nifi.registry.flow.VersionedResourceDefinition; |
| import org.apache.nifi.registry.flow.VersionedResourceType; |
| import org.apache.nifi.remote.PublicPort; |
| import org.apache.nifi.remote.RemoteGroupPort; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.BiFunction; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| |
| |
| public class NiFiRegistryFlowMapper { |
| |
| private final ExtensionManager extensionManager; |
| |
| // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when |
| // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned' |
| // identifier based on the component's actual id. We do connections last, so that all components will already have been |
| // created before attempting to create the connection, where the ConnectableDTO is converted. |
| private Map<String, String> versionedComponentIds = new HashMap<>(); |
| |
| public NiFiRegistryFlowMapper(final ExtensionManager extensionManager) { |
| this.extensionManager = extensionManager; |
| } |
| |
| /** |
| * Map the given process group to a versioned process group without any use of an actual flow registry even if the |
| * group is currently versioned in a registry. |
| * |
| * @param group the process group to map |
| * @param serviceProvider the controller service provider to use for mapping |
| * @return a complete versioned process group without any registry related details |
| */ |
| public InstantiatedVersionedProcessGroup mapNonVersionedProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider) { |
| versionedComponentIds.clear(); |
| |
| // always include descendant flows and do not apply any registry versioning info that may be present in the group |
| return mapGroup(group, serviceProvider, (processGroup, versionedGroup) -> true); |
| } |
| |
| /** |
| * Map the given process group to a versioned process group using the provided registry client. |
| * |
| * @param group the process group to map |
| * @param serviceProvider the controller service provider to use for mapping |
| * @param registryClient the registry client to use when retrieving versioning details |
| * @param mapDescendantVersionedFlows true in order to include descendant flows in the mapped result |
| * @return a complete versioned process group with applicable registry related details |
| */ |
| public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, |
| final FlowRegistryClient registryClient, |
| final boolean mapDescendantVersionedFlows) { |
| versionedComponentIds.clear(); |
| |
| // apply registry versioning according to the lambda below |
| // NOTE: lambda refers to registry client and map descendant boolean which will not change during recursion |
| return mapGroup(group, serviceProvider, (processGroup, versionedGroup) -> { |
| final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); |
| if (versionControlInfo != null) { |
| final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates(); |
| final String registryId = versionControlInfo.getRegistryIdentifier(); |
| final FlowRegistry registry = registryClient.getFlowRegistry(registryId); |
| if (registry == null) { |
| throw new IllegalStateException("Process Group refers to a Flow Registry with ID " + registryId + " but no Flow Registry exists with that ID. Cannot resolve to a URL."); |
| } |
| |
| coordinates.setRegistryUrl(registry.getURL()); |
| coordinates.setBucketId(versionControlInfo.getBucketIdentifier()); |
| coordinates.setFlowId(versionControlInfo.getFlowIdentifier()); |
| coordinates.setVersion(versionControlInfo.getVersion()); |
| versionedGroup.setVersionedFlowCoordinates(coordinates); |
| |
| // We need to register the Port ID -> Versioned Component ID's in our versionedComponentIds member variable for all input & output ports. |
| // Otherwise, we will not be able to lookup the port when connecting to it. |
| for (final Port port : processGroup.getInputPorts()) { |
| getId(port.getVersionedComponentId(), port.getIdentifier()); |
| } |
| for (final Port port : processGroup.getOutputPorts()) { |
| getId(port.getVersionedComponentId(), port.getIdentifier()); |
| } |
| |
| // If the Process Group itself is remotely versioned, then we don't want to include its contents |
| // because the contents are remotely managed and not part of the versioning of this Process Group |
| return mapDescendantVersionedFlows; |
| } |
| return true; |
| }); |
| } |
| |
| private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, |
| final BiFunction<ProcessGroup, VersionedProcessGroup, Boolean> applyVersionControlInfo) { |
| final Set<String> allIncludedGroupsIds = group.findAllProcessGroups().stream() |
| .map(ProcessGroup::getIdentifier) |
| .collect(Collectors.toSet()); |
| allIncludedGroupsIds.add(group.getIdentifier()); |
| |
| final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences = new HashMap<>(); |
| final InstantiatedVersionedProcessGroup versionedGroup = |
| mapGroup(group, serviceProvider, applyVersionControlInfo, true, allIncludedGroupsIds, externalControllerServiceReferences); |
| |
| populateReferencedAncestorVariables(group, versionedGroup); |
| |
| return versionedGroup; |
| } |
| |
| private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, |
| final BiFunction<ProcessGroup, VersionedProcessGroup, Boolean> applyVersionControlInfo, |
| final boolean topLevel, final Set<String> includedGroupIds, |
| final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences) { |
| |
| final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier()); |
| versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier())); |
| versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier())); |
| versionedGroup.setName(group.getName()); |
| versionedGroup.setComments(group.getComments()); |
| versionedGroup.setPosition(mapPosition(group.getPosition())); |
| versionedGroup.setFlowFileConcurrency(group.getFlowFileConcurrency().name()); |
| versionedGroup.setFlowFileOutboundPolicy(group.getFlowFileOutboundPolicy().name()); |
| versionedGroup.setDefaultFlowFileExpiration(group.getDefaultFlowFileExpiration()); |
| versionedGroup.setDefaultBackPressureObjectThreshold(group.getDefaultBackPressureObjectThreshold()); |
| versionedGroup.setDefaultBackPressureDataSizeThreshold(group.getDefaultBackPressureDataSizeThreshold()); |
| |
| final ParameterContext parameterContext = group.getParameterContext(); |
| versionedGroup.setParameterContextName(parameterContext == null ? null : parameterContext.getName()); |
| |
| // If we are at the 'top level', meaning that the given Process Group is the group that we are creating a VersionedProcessGroup for, |
| // then we don't want to include the RemoteFlowCoordinates; we want to include the group contents. The RemoteFlowCoordinates will be used |
| // only for a child group that is itself version controlled. |
| if (!topLevel) { |
| final boolean mapDescendantVersionedFlows = applyVersionControlInfo.apply(group, versionedGroup); |
| |
| // return here if we do not want to include remotely managed descendant flows |
| if (!mapDescendantVersionedFlows) { |
| return versionedGroup; |
| } |
| } |
| |
| versionedGroup.setControllerServices(group.getControllerServices(false).stream() |
| .map(service -> mapControllerService(service, serviceProvider, includedGroupIds, externalControllerServiceReferences)) |
| .collect(Collectors.toCollection(LinkedHashSet::new))); |
| |
| versionedGroup.setFunnels(group.getFunnels().stream() |
| .map(this::mapFunnel) |
| .collect(Collectors.toCollection(LinkedHashSet::new))); |
| |
| versionedGroup.setInputPorts(group.getInputPorts().stream() |
| .map(this::mapPort) |
| .collect(Collectors.toCollection(LinkedHashSet::new))); |
| |
| versionedGroup.setOutputPorts(group.getOutputPorts().stream() |
| .map(this::mapPort) |
| .collect(Collectors.toCollection(LinkedHashSet::new))); |
| |
| versionedGroup.setLabels(group.getLabels().stream() |
| .map(this::mapLabel) |
| .collect(Collectors.toCollection(LinkedHashSet::new))); |
| |
| versionedGroup.setProcessors(group.getProcessors().stream() |
| .map(processor -> mapProcessor(processor, serviceProvider, includedGroupIds, externalControllerServiceReferences)) |
| .collect(Collectors.toCollection(LinkedHashSet::new))); |
| |
| versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream() |
| .map(this::mapRemoteProcessGroup) |
| .collect(Collectors.toCollection(LinkedHashSet::new))); |
| |
| versionedGroup.setProcessGroups(group.getProcessGroups().stream() |
| .map(grp -> mapGroup(grp, serviceProvider, applyVersionControlInfo, false, includedGroupIds, externalControllerServiceReferences)) |
| .collect(Collectors.toCollection(LinkedHashSet::new))); |
| |
| versionedGroup.setConnections(group.getConnections().stream() |
| .map(this::mapConnection) |
| .collect(Collectors.toCollection(LinkedHashSet::new))); |
| |
| versionedGroup.setVariables(group.getVariableRegistry().getVariableMap().entrySet().stream() |
| .collect(Collectors.toMap(entry -> entry.getKey().getName(), Map.Entry::getValue))); |
| |
| if (topLevel) { |
| versionedGroup.setExternalControllerServiceReferences(externalControllerServiceReferences); |
| } |
| |
| return versionedGroup; |
| } |
| |
| private void populateReferencedAncestorVariables(final ProcessGroup group, final VersionedProcessGroup versionedGroup) { |
| final Set<String> ancestorVariableNames = new HashSet<>(); |
| populateVariableNames(group.getParent(), ancestorVariableNames); |
| |
| final Map<String, String> implicitlyDefinedVariables = new HashMap<>(); |
| for (final String variableName : ancestorVariableNames) { |
| final boolean isReferenced = !group.getComponentsAffectedByVariable(variableName).isEmpty(); |
| if (isReferenced) { |
| final String value = group.getVariableRegistry().getVariableValue(variableName); |
| implicitlyDefinedVariables.put(variableName, value); |
| } |
| } |
| |
| if (!implicitlyDefinedVariables.isEmpty()) { |
| // Merge the implicit variables with the explicitly defined variables for the Process Group |
| // and set those as the Versioned Group's variables. |
| if (versionedGroup.getVariables() != null) { |
| implicitlyDefinedVariables.putAll(versionedGroup.getVariables()); |
| } |
| |
| versionedGroup.setVariables(implicitlyDefinedVariables); |
| } |
| } |
| |
| private void populateVariableNames(final ProcessGroup group, final Set<String> variableNames) { |
| if (group == null) { |
| return; |
| } |
| |
| group.getVariableRegistry().getVariableMap().keySet().stream() |
| .map(VariableDescriptor::getName) |
| .forEach(variableNames::add); |
| |
| populateVariableNames(group.getParent(), variableNames); |
| } |
| |
| private String getId(final Optional<String> currentVersionedId, final String componentId) { |
| final String versionedId; |
| if (currentVersionedId.isPresent()) { |
| versionedId = currentVersionedId.get(); |
| } else { |
| versionedId = generateVersionedComponentId(componentId); |
| } |
| |
| versionedComponentIds.put(componentId, versionedId); |
| return versionedId; |
| } |
| |
| /** |
| * Generate a versioned component identifier based on the given component identifier. The result for a given |
| * component identifier is deterministic. |
| * |
| * @param componentId the component identifier to generate a versioned component identifier for |
| * @return a deterministic versioned component identifier |
| */ |
| public static String generateVersionedComponentId(final String componentId) { |
| return UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString(); |
| } |
| |
| private <E extends Exception> String getIdOrThrow(final Optional<String> currentVersionedId, final String componentId, final Supplier<E> exceptionSupplier) throws E { |
| if (currentVersionedId.isPresent()) { |
| return currentVersionedId.get(); |
| } else { |
| final String resolved = versionedComponentIds.get(componentId); |
| if (resolved == null) { |
| throw exceptionSupplier.get(); |
| } |
| |
| return resolved; |
| } |
| } |
| |
| |
| public String getGroupId(final String groupId) { |
| return versionedComponentIds.get(groupId); |
| } |
| |
| public VersionedConnection mapConnection(final Connection connection) { |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| |
| final VersionedConnection versionedConnection = new InstantiatedVersionedConnection(connection.getIdentifier(), connection.getProcessGroup().getIdentifier()); |
| versionedConnection.setIdentifier(getId(connection.getVersionedComponentId(), connection.getIdentifier())); |
| versionedConnection.setGroupIdentifier(getGroupId(connection.getProcessGroup().getIdentifier())); |
| versionedConnection.setName(connection.getName()); |
| versionedConnection.setBackPressureDataSizeThreshold(queue.getBackPressureDataSizeThreshold()); |
| versionedConnection.setBackPressureObjectThreshold(queue.getBackPressureObjectThreshold()); |
| versionedConnection.setFlowFileExpiration(queue.getFlowFileExpiration()); |
| versionedConnection.setLabelIndex(connection.getLabelIndex()); |
| versionedConnection.setPrioritizers(queue.getPriorities().stream().map(p -> p.getClass().getName()).collect(Collectors.toList())); |
| versionedConnection.setSelectedRelationships(connection.getRelationships().stream().map(Relationship::getName).collect(Collectors.toSet())); |
| versionedConnection.setzIndex(connection.getZIndex()); |
| |
| final FlowFileQueue flowFileQueue = connection.getFlowFileQueue(); |
| versionedConnection.setLoadBalanceStrategy(flowFileQueue.getLoadBalanceStrategy().name()); |
| versionedConnection.setPartitioningAttribute(flowFileQueue.getPartitioningAttribute()); |
| versionedConnection.setLoadBalanceCompression(flowFileQueue.getLoadBalanceCompression().name()); |
| |
| versionedConnection.setBends(connection.getBendPoints().stream() |
| .map(this::mapPosition) |
| .collect(Collectors.toList())); |
| |
| versionedConnection.setSource(mapConnectable(connection.getSource())); |
| versionedConnection.setDestination(mapConnectable(connection.getDestination())); |
| |
| return versionedConnection; |
| } |
| |
| public ConnectableComponent mapConnectable(final Connectable connectable) { |
| final ConnectableComponent component = new InstantiatedConnectableComponent(connectable.getIdentifier(), connectable.getProcessGroupIdentifier()); |
| |
| final String versionedId = getIdOrThrow(connectable.getVersionedComponentId(), connectable.getIdentifier(), |
| () -> new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component")); |
| component.setId(versionedId); |
| |
| component.setComments(connectable.getComments()); |
| |
| final String groupId; |
| if (connectable instanceof RemoteGroupPort) { |
| final RemoteGroupPort port = (RemoteGroupPort) connectable; |
| final RemoteProcessGroup rpg = port.getRemoteProcessGroup(); |
| final Optional<String> rpgVersionedId = rpg.getVersionedComponentId(); |
| groupId = getIdOrThrow(rpgVersionedId, rpg.getIdentifier(), |
| () -> new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to")); |
| |
| } else { |
| groupId = getIdOrThrow(connectable.getProcessGroup().getVersionedComponentId(), connectable.getProcessGroupIdentifier(), |
| () -> new IllegalArgumentException("Unable to find the Versioned Component ID for the Process Group that " + connectable + " belongs to")); |
| } |
| |
| component.setGroupId(groupId); |
| |
| component.setName(connectable.getName()); |
| component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name())); |
| return component; |
| } |
| |
| public VersionedControllerService mapControllerService(final ControllerServiceNode controllerService, final ControllerServiceProvider serviceProvider, final Set<String> includedGroupIds, |
| final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences) { |
| final VersionedControllerService versionedService = new InstantiatedVersionedControllerService(controllerService.getIdentifier(), controllerService.getProcessGroupIdentifier()); |
| versionedService.setIdentifier(getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier())); |
| versionedService.setGroupIdentifier(getGroupId(controllerService.getProcessGroupIdentifier())); |
| versionedService.setName(controllerService.getName()); |
| versionedService.setAnnotationData(controllerService.getAnnotationData()); |
| versionedService.setBundle(mapBundle(controllerService.getBundleCoordinate())); |
| versionedService.setComments(controllerService.getComments()); |
| |
| versionedService.setControllerServiceApis(mapControllerServiceApis(controllerService)); |
| versionedService.setProperties(mapProperties(controllerService, serviceProvider)); |
| versionedService.setPropertyDescriptors(mapPropertyDescriptors(controllerService, serviceProvider, includedGroupIds, externalControllerServiceReferences)); |
| versionedService.setType(controllerService.getCanonicalClassName()); |
| |
| return versionedService; |
| } |
| |
| private Map<String, String> mapProperties(final ComponentNode component, final ControllerServiceProvider serviceProvider) { |
| final Map<String, String> mapped = new HashMap<>(); |
| |
| component.getProperties().keySet().stream() |
| .filter(property -> isMappable(property, component.getProperty(property))) |
| .forEach(property -> { |
| String value = component.getRawPropertyValue(property); |
| if (value == null) { |
| value = property.getDefaultValue(); |
| } |
| |
| if (value != null && property.getControllerServiceDefinition() != null) { |
| // Property references a Controller Service. Instead of storing the existing value, we want |
| // to store the Versioned Component ID of the service. |
| final ControllerServiceNode controllerService = serviceProvider.getControllerServiceNode(value); |
| if (controllerService != null) { |
| value = getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier()); |
| } |
| } |
| |
| mapped.put(property.getName(), value); |
| }); |
| |
| return mapped; |
| } |
| |
| private boolean isMappable(final PropertyDescriptor propertyDescriptor, final PropertyConfiguration propertyConfiguration) { |
| if (!propertyDescriptor.isSensitive()) { // If the property is not sensitive, it can be mapped. |
| return true; |
| } |
| |
| if (propertyConfiguration == null) { |
| return false; |
| } |
| |
| // Sensitive properties can be mapped if and only if they reference a Parameter. If a sensitive property references a parameter, it cannot contain any other value around it. |
| // For example, for a non-sensitive property, a value of "hello#{param}123" is valid, but for a sensitive property, it is invalid. Only something like "hello123" or "#{param}" is valid. |
| // Thus, we will map sensitive properties only if they reference a parameter. |
| return !propertyConfiguration.getParameterReferences().isEmpty(); |
| } |
| |
| private Map<String, VersionedPropertyDescriptor> mapPropertyDescriptors(final ComponentNode component, final ControllerServiceProvider serviceProvider, final Set<String> includedGroupIds, |
| final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences) { |
| final Map<String, VersionedPropertyDescriptor> descriptors = new HashMap<>(); |
| for (final PropertyDescriptor descriptor : component.getProperties().keySet()) { |
| final VersionedPropertyDescriptor versionedDescriptor = new VersionedPropertyDescriptor(); |
| versionedDescriptor.setName(descriptor.getName()); |
| versionedDescriptor.setDisplayName(descriptor.getDisplayName()); |
| versionedDescriptor.setSensitive(descriptor.isSensitive()); |
| |
| final VersionedResourceDefinition versionedResourceDefinition = mapResourceDefinition(descriptor.getResourceDefinition()); |
| versionedDescriptor.setResourceDefinition(versionedResourceDefinition); |
| |
| final Class<?> referencedServiceType = descriptor.getControllerServiceDefinition(); |
| versionedDescriptor.setIdentifiesControllerService(referencedServiceType != null); |
| |
| if (referencedServiceType != null) { |
| final String value = component.getProperty(descriptor).getRawValue(); |
| if (value != null) { |
| final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(value); |
| if (serviceNode == null) { |
| continue; |
| } |
| |
| final String serviceGroupId = serviceNode.getProcessGroupIdentifier(); |
| if (!includedGroupIds.contains(serviceGroupId)) { |
| final String serviceId = getId(serviceNode.getVersionedComponentId(), serviceNode.getIdentifier()); |
| |
| final ExternalControllerServiceReference controllerServiceReference = new ExternalControllerServiceReference(); |
| controllerServiceReference.setIdentifier(serviceId); |
| controllerServiceReference.setName(serviceNode.getName()); |
| externalControllerServiceReferences.put(serviceId, controllerServiceReference); |
| } |
| } |
| } |
| |
| descriptors.put(descriptor.getName(), versionedDescriptor); |
| } |
| |
| return descriptors; |
| } |
| |
| private VersionedResourceDefinition mapResourceDefinition(final ResourceDefinition resourceDefinition) { |
| if (resourceDefinition == null) { |
| return null; |
| } |
| |
| final ResourceCardinality cardinality = resourceDefinition.getCardinality(); |
| final VersionedResourceCardinality versionedCardinality = VersionedResourceCardinality.valueOf(cardinality.name()); |
| |
| final Set<VersionedResourceType> versionedResourceTypes = new HashSet<>(); |
| resourceDefinition.getResourceTypes().forEach(resourceType -> versionedResourceTypes.add(VersionedResourceType.valueOf(resourceType.name()))); |
| |
| final VersionedResourceDefinition versionedResourceDefinition = new VersionedResourceDefinition(); |
| versionedResourceDefinition.setCardinality(versionedCardinality); |
| versionedResourceDefinition.setResourceTypes(versionedResourceTypes); |
| return versionedResourceDefinition; |
| } |
| |
| private Bundle mapBundle(final BundleCoordinate coordinate) { |
| final Bundle versionedBundle = new Bundle(); |
| versionedBundle.setGroup(coordinate.getGroup()); |
| versionedBundle.setArtifact(coordinate.getId()); |
| versionedBundle.setVersion(coordinate.getVersion()); |
| return versionedBundle; |
| } |
| |
| private List<ControllerServiceAPI> mapControllerServiceApis(final ControllerServiceNode service) { |
| final Class<?> serviceClass = service.getControllerServiceImplementation().getClass(); |
| |
| final Set<Class<?>> serviceApiClasses = new HashSet<>(); |
| // get all of it's interfaces to determine the controller service api's it implements |
| final List<Class<?>> interfaces = ClassUtils.getAllInterfaces(serviceClass); |
| for (final Class<?> i : interfaces) { |
| // add all controller services that's not ControllerService itself |
| if (ControllerService.class.isAssignableFrom(i) && !ControllerService.class.equals(i)) { |
| serviceApiClasses.add(i); |
| } |
| } |
| |
| |
| final List<ControllerServiceAPI> serviceApis = new ArrayList<>(); |
| for (final Class<?> serviceApiClass : serviceApiClasses) { |
| final BundleCoordinate bundleCoordinate = extensionManager.getBundle(serviceApiClass.getClassLoader()).getBundleDetails().getCoordinate(); |
| |
| final ControllerServiceAPI serviceApi = new ControllerServiceAPI(); |
| serviceApi.setType(serviceApiClass.getName()); |
| serviceApi.setBundle(mapBundle(bundleCoordinate)); |
| serviceApis.add(serviceApi); |
| } |
| return serviceApis; |
| } |
| |
| |
| public VersionedFunnel mapFunnel(final Funnel funnel) { |
| final VersionedFunnel versionedFunnel = new InstantiatedVersionedFunnel(funnel.getIdentifier(), funnel.getProcessGroupIdentifier()); |
| versionedFunnel.setIdentifier(getId(funnel.getVersionedComponentId(), funnel.getIdentifier())); |
| versionedFunnel.setGroupIdentifier(getGroupId(funnel.getProcessGroupIdentifier())); |
| versionedFunnel.setPosition(mapPosition(funnel.getPosition())); |
| |
| return versionedFunnel; |
| } |
| |
| public VersionedLabel mapLabel(final Label label) { |
| final VersionedLabel versionedLabel = new InstantiatedVersionedLabel(label.getIdentifier(), label.getProcessGroupIdentifier()); |
| versionedLabel.setIdentifier(getId(label.getVersionedComponentId(), label.getIdentifier())); |
| versionedLabel.setGroupIdentifier(getGroupId(label.getProcessGroupIdentifier())); |
| versionedLabel.setHeight(label.getSize().getHeight()); |
| versionedLabel.setWidth(label.getSize().getWidth()); |
| versionedLabel.setLabel(label.getValue()); |
| versionedLabel.setPosition(mapPosition(label.getPosition())); |
| versionedLabel.setStyle(label.getStyle()); |
| |
| return versionedLabel; |
| } |
| |
| public VersionedPort mapPort(final Port port) { |
| final VersionedPort versionedPort = new InstantiatedVersionedPort(port.getIdentifier(), port.getProcessGroupIdentifier()); |
| versionedPort.setIdentifier(getId(port.getVersionedComponentId(), port.getIdentifier())); |
| versionedPort.setGroupIdentifier(getGroupId(port.getProcessGroupIdentifier())); |
| versionedPort.setComments(port.getComments()); |
| versionedPort.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks()); |
| versionedPort.setName(port.getName()); |
| versionedPort.setPosition(mapPosition(port.getPosition())); |
| versionedPort.setType(PortType.valueOf(port.getConnectableType().name())); |
| |
| if (port instanceof PublicPort) { |
| versionedPort.setAllowRemoteAccess(true); |
| } else { |
| versionedPort.setAllowRemoteAccess(false); |
| } |
| |
| return versionedPort; |
| } |
| |
| public Position mapPosition(final org.apache.nifi.connectable.Position pos) { |
| final Position position = new Position(); |
| position.setX(pos.getX()); |
| position.setY(pos.getY()); |
| return position; |
| } |
| |
| public VersionedProcessor mapProcessor(final ProcessorNode procNode, final ControllerServiceProvider serviceProvider, final Set<String> includedGroupIds, |
| final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences) { |
| final VersionedProcessor processor = new InstantiatedVersionedProcessor(procNode.getIdentifier(), procNode.getProcessGroupIdentifier()); |
| processor.setIdentifier(getId(procNode.getVersionedComponentId(), procNode.getIdentifier())); |
| processor.setGroupIdentifier(getGroupId(procNode.getProcessGroupIdentifier())); |
| processor.setType(procNode.getCanonicalClassName()); |
| processor.setAnnotationData(procNode.getAnnotationData()); |
| processor.setAutoTerminatedRelationships(procNode.getAutoTerminatedRelationships().stream().map(Relationship::getName).collect(Collectors.toSet())); |
| processor.setBulletinLevel(procNode.getBulletinLevel().name()); |
| processor.setBundle(mapBundle(procNode.getBundleCoordinate())); |
| processor.setComments(procNode.getComments()); |
| processor.setConcurrentlySchedulableTaskCount(procNode.getMaxConcurrentTasks()); |
| processor.setExecutionNode(procNode.getExecutionNode().name()); |
| processor.setName(procNode.getName()); |
| processor.setPenaltyDuration(procNode.getPenalizationPeriod()); |
| processor.setPosition(mapPosition(procNode.getPosition())); |
| processor.setProperties(mapProperties(procNode, serviceProvider)); |
| processor.setPropertyDescriptors(mapPropertyDescriptors(procNode, serviceProvider, includedGroupIds, externalControllerServiceReferences)); |
| processor.setRunDurationMillis(procNode.getRunDuration(TimeUnit.MILLISECONDS)); |
| processor.setSchedulingPeriod(procNode.getSchedulingPeriod()); |
| processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name()); |
| processor.setStyle(procNode.getStyle()); |
| processor.setYieldDuration(procNode.getYieldPeriod()); |
| processor.setScheduledState(procNode.getScheduledState() == ScheduledState.DISABLED ? org.apache.nifi.registry.flow.ScheduledState.DISABLED |
| : org.apache.nifi.registry.flow.ScheduledState.ENABLED); |
| |
| return processor; |
| } |
| |
| public VersionedRemoteProcessGroup mapRemoteProcessGroup(final RemoteProcessGroup remoteGroup) { |
| final VersionedRemoteProcessGroup rpg = new InstantiatedVersionedRemoteProcessGroup(remoteGroup.getIdentifier(), remoteGroup.getProcessGroupIdentifier()); |
| rpg.setIdentifier(getId(remoteGroup.getVersionedComponentId(), remoteGroup.getIdentifier())); |
| rpg.setGroupIdentifier(getGroupId(remoteGroup.getProcessGroupIdentifier())); |
| rpg.setComments(remoteGroup.getComments()); |
| rpg.setCommunicationsTimeout(remoteGroup.getCommunicationsTimeout()); |
| rpg.setLocalNetworkInterface(remoteGroup.getNetworkInterface()); |
| rpg.setName(remoteGroup.getName()); |
| rpg.setInputPorts(remoteGroup.getInputPorts().stream() |
| .map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT)) |
| .collect(Collectors.toSet())); |
| rpg.setOutputPorts(remoteGroup.getOutputPorts().stream() |
| .map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT)) |
| .collect(Collectors.toSet())); |
| rpg.setPosition(mapPosition(remoteGroup.getPosition())); |
| rpg.setProxyHost(remoteGroup.getProxyHost()); |
| rpg.setProxyPort(remoteGroup.getProxyPort()); |
| rpg.setProxyUser(remoteGroup.getProxyUser()); |
| rpg.setTargetUri(remoteGroup.getTargetUri()); |
| rpg.setTargetUris(remoteGroup.getTargetUris()); |
| rpg.setTransportProtocol(remoteGroup.getTransportProtocol().name()); |
| rpg.setYieldDuration(remoteGroup.getYieldDuration()); |
| return rpg; |
| } |
| |
| public VersionedRemoteGroupPort mapRemotePort(final RemoteGroupPort remotePort, final ComponentType componentType) { |
| final VersionedRemoteGroupPort port = new InstantiatedVersionedRemoteGroupPort(remotePort.getIdentifier(), remotePort.getRemoteProcessGroup().getIdentifier()); |
| port.setIdentifier(getId(remotePort.getVersionedComponentId(), remotePort.getIdentifier())); |
| port.setGroupIdentifier(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier())); |
| port.setComments(remotePort.getComments()); |
| port.setConcurrentlySchedulableTaskCount(remotePort.getMaxConcurrentTasks()); |
| port.setRemoteGroupId(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier())); |
| port.setName(remotePort.getName()); |
| port.setUseCompression(remotePort.isUseCompression()); |
| port.setBatchSize(mapBatchSettings(remotePort)); |
| port.setTargetId(remotePort.getTargetIdentifier()); |
| port.setComponentType(componentType); |
| return port; |
| } |
| |
| private BatchSize mapBatchSettings(final RemoteGroupPort remotePort) { |
| final BatchSize batchSize = new BatchSize(); |
| batchSize.setCount(remotePort.getBatchCount()); |
| batchSize.setDuration(remotePort.getBatchDuration()); |
| batchSize.setSize(remotePort.getBatchSize()); |
| return batchSize; |
| } |
| |
| public Map<String, VersionedParameterContext> mapParameterContexts(final ProcessGroup processGroup, |
| final boolean mapDescendantVersionedFlows) { |
| // cannot use a set to enforce uniqueness of parameter contexts because VersionedParameterContext in the |
| // registry data model doesn't currently implement hashcode/equals based on context name |
| final Map<String, VersionedParameterContext> parameterContexts = new HashMap<>(); |
| mapParameterContexts(processGroup, mapDescendantVersionedFlows, parameterContexts); |
| return parameterContexts; |
| } |
| |
| private void mapParameterContexts(final ProcessGroup processGroup, final boolean mapDescendantVersionedFlows, |
| final Map<String, VersionedParameterContext> parameterContexts) { |
| final ParameterContext parameterContext = processGroup.getParameterContext(); |
| if (parameterContext != null) { |
| // map this process group's parameter context and add to the collection |
| final Set<VersionedParameter> parameters = parameterContext.getParameters().values().stream() |
| .map(this::mapParameter) |
| .collect(Collectors.toSet()); |
| |
| final VersionedParameterContext versionedContext = new VersionedParameterContext(); |
| versionedContext.setName(parameterContext.getName()); |
| versionedContext.setParameters(parameters); |
| parameterContexts.put(versionedContext.getName(), versionedContext); |
| } |
| |
| for (final ProcessGroup child : processGroup.getProcessGroups()) { |
| // only include child process group parameter contexts if boolean indicator is true or process group is unversioned |
| if (mapDescendantVersionedFlows || child.getVersionControlInformation() == null) { |
| mapParameterContexts(child, mapDescendantVersionedFlows, parameterContexts); |
| } |
| } |
| } |
| |
| private VersionedParameter mapParameter(final Parameter parameter) { |
| if (parameter == null) { |
| return null; |
| } |
| |
| final ParameterDescriptor descriptor = parameter.getDescriptor(); |
| |
| final VersionedParameter versionedParameter = new VersionedParameter(); |
| versionedParameter.setDescription(descriptor.getDescription()); |
| versionedParameter.setName(descriptor.getName()); |
| versionedParameter.setSensitive(descriptor.isSensitive()); |
| versionedParameter.setValue(descriptor.isSensitive() ? null : parameter.getValue()); |
| return versionedParameter; |
| } |
| } |