blob: dbad60ed8b4dddf58d4a982b19eaf4dc66b4e696 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import javax.ws.rs.WebApplicationException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGroupDAO {
private FlowController flowController;
@Override
public ProcessGroup createProcessGroup(String parentGroupId, ProcessGroupDTO processGroup) {
final FlowManager flowManager = flowController.getFlowManager();
if (processGroup.getParentGroupId() != null && !flowManager.areGroupsSame(processGroup.getParentGroupId(), parentGroupId)) {
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Process Group is being added.");
}
// get the parent group
ProcessGroup parentGroup = locateProcessGroup(flowController, parentGroupId);
// create the process group
ProcessGroup group = flowManager.createProcessGroup(processGroup.getId());
if (processGroup.getName() != null) {
group.setName(processGroup.getName());
}
if (processGroup.getPosition() != null) {
group.setPosition(new Position(processGroup.getPosition().getX(), processGroup.getPosition().getY()));
}
final ParameterContextReferenceEntity parameterContextReference = processGroup.getParameterContext();
if (parameterContextReference != null && parameterContextReference.getId() != null) {
final ParameterContext parameterContext = flowController.getFlowManager().getParameterContextManager().getParameterContext(parameterContextReference.getId());
group.setParameterContext(parameterContext);
}
// add the process group
group.setParent(parentGroup);
parentGroup.addProcessGroup(group);
return group;
}
@Override
public boolean hasProcessGroup(String groupId) {
return flowController.getFlowManager().getGroup(groupId) != null;
}
@Override
public void verifyUpdate(final ProcessGroupDTO processGroup) {
final ParameterContextReferenceEntity parameterContextReference = processGroup.getParameterContext();
if (parameterContextReference == null) {
return;
}
final ParameterContext parameterContext = locateParameterContext(parameterContextReference.getId());
final ProcessGroup group = locateProcessGroup(flowController, processGroup.getId());
group.verifyCanSetParameterContext(parameterContext);
}
private ParameterContext locateParameterContext(final String id) {
final ParameterContext parameterContext;
if (id == null) {
return null;
} else {
parameterContext = flowController.getFlowManager().getParameterContextManager().getParameterContext(id);
if (parameterContext == null) {
throw new IllegalStateException("Cannot update Process Group's Parameter Context because no Parameter Context exists with ID " + id);
}
return parameterContext;
}
}
@Override
public ProcessGroup getProcessGroup(String groupId) {
return locateProcessGroup(flowController, groupId);
}
@Override
public Set<ProcessGroup> getProcessGroups(String parentGroupId) {
ProcessGroup group = locateProcessGroup(flowController, parentGroupId);
return group.getProcessGroups();
}
@Override
public void verifyScheduleComponents(final String groupId, final ScheduledState state,final Set<String> componentIds) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
final Set<ProcessGroup> validGroups = new HashSet<>();
validGroups.add(group);
validGroups.addAll(group.findAllProcessGroups());
for (final String componentId : componentIds) {
final Connectable connectable = findConnectable(componentId, groupId, validGroups);
if (connectable == null) {
throw new ResourceNotFoundException("Unable to find component with id " + componentId);
}
if (connectable instanceof RemoteGroupPort) {
final RemoteGroupPort remotePort = (RemoteGroupPort) connectable;
if (ScheduledState.RUNNING.equals(state)) {
remotePort.verifyCanStart();
} else {
remotePort.verifyCanStop();
}
continue;
}
// verify as appropriate
if (ScheduledState.RUNNING.equals(state)) {
group.verifyCanStart(connectable);
} else {
group.verifyCanStop(connectable);
}
}
}
@Override
public void verifyEnableComponents(String groupId, ScheduledState state, Set<String> componentIds) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
final Set<ProcessGroup> validGroups = new HashSet<>();
validGroups.add(group);
validGroups.addAll(group.findAllProcessGroups());
for (final String componentId : componentIds) {
final Connectable connectable = findConnectable(componentId, groupId, validGroups);
if (ScheduledState.STOPPED.equals(state)) {
connectable.verifyCanEnable();
} else if (ScheduledState.DISABLED.equals(state)) {
connectable.verifyCanDisable();
}
}
}
@Override
public void verifyActivateControllerServices(final ControllerServiceState state, final Collection<String> serviceIds) {
final FlowManager flowManager = flowController.getFlowManager();
final Set<ControllerServiceNode> serviceNodes = serviceIds.stream()
.map(flowManager::getControllerServiceNode)
.collect(Collectors.toSet());
for (final ControllerServiceNode serviceNode : serviceNodes) {
if (state == ControllerServiceState.ENABLED) {
serviceNode.verifyCanEnable(serviceNodes);
} else {
serviceNode.verifyCanDisable(serviceNodes);
}
}
}
private Connectable findConnectable(final String componentId, final String groupId, final Set<ProcessGroup> validProcessGroups) {
// Get the component with the given ID and ensure that it belongs to the group that we are looking for.
// We do this, rather than calling ProcessGroup.findLocalConnectable because for any component that is buried several
// layers of Process Groups deep, that method becomes quite a bit more expensive than this method, due to all of the
// Read Locks that must be obtained while recursing through the Process Group's descendant groups.
final Connectable connectable = flowController.getFlowManager().findConnectable(componentId);
if (connectable == null) {
throw new ResourceNotFoundException("Could not find Component with ID " + componentId);
}
final ProcessGroup connectableGroup = connectable.getProcessGroup();
if (!validProcessGroups.contains(connectableGroup)) {
throw new ResourceNotFoundException("Component with ID " + componentId + " does not belong to Process Group " + groupId + " or any of its descendent groups");
}
return connectable;
}
@Override
public void scheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
final Set<ProcessGroup> validGroups = new HashSet<>();
validGroups.add(group);
validGroups.addAll(group.findAllProcessGroups());
for (final String componentId : componentIds) {
final Connectable connectable = findConnectable(componentId, groupId, validGroups);
if (ScheduledState.RUNNING.equals(state)) {
switch (connectable.getConnectableType()) {
case PROCESSOR:
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
break;
case INPUT_PORT:
connectable.getProcessGroup().startInputPort((Port) connectable);
break;
case OUTPUT_PORT:
connectable.getProcessGroup().startOutputPort((Port) connectable);
break;
case REMOTE_INPUT_PORT:
case REMOTE_OUTPUT_PORT:
final RemoteGroupPort remotePort = (RemoteGroupPort) connectable;
remotePort.getRemoteProcessGroup().startTransmitting(remotePort);
break;
}
} else if (ScheduledState.STOPPED.equals(state)) {
switch (connectable.getConnectableType()) {
case PROCESSOR:
connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable);
break;
case INPUT_PORT:
connectable.getProcessGroup().stopInputPort((Port) connectable);
break;
case OUTPUT_PORT:
connectable.getProcessGroup().stopOutputPort((Port) connectable);
break;
case REMOTE_INPUT_PORT:
case REMOTE_OUTPUT_PORT:
final RemoteGroupPort remotePort = (RemoteGroupPort) connectable;
remotePort.getRemoteProcessGroup().stopTransmitting(remotePort);
break;
}
}
}
}
@Override
public void enableComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
final Set<ProcessGroup> validGroups = new HashSet<>();
validGroups.add(group);
validGroups.addAll(group.findAllProcessGroups());
for (final String componentId : componentIds) {
final Connectable connectable = findConnectable(componentId, groupId, validGroups);
if (ScheduledState.STOPPED.equals(state)) {
switch (connectable.getConnectableType()) {
case PROCESSOR:
connectable.getProcessGroup().enableProcessor((ProcessorNode) connectable);
break;
case INPUT_PORT:
connectable.getProcessGroup().enableInputPort((Port) connectable);
break;
case OUTPUT_PORT:
connectable.getProcessGroup().enableOutputPort((Port) connectable);
break;
}
} else if (ScheduledState.DISABLED.equals(state)) {
switch (connectable.getConnectableType()) {
case PROCESSOR:
connectable.getProcessGroup().disableProcessor((ProcessorNode) connectable);
break;
case INPUT_PORT:
connectable.getProcessGroup().disableInputPort((Port) connectable);
break;
case OUTPUT_PORT:
connectable.getProcessGroup().disableOutputPort((Port) connectable);
break;
}
}
}
}
@Override
public void activateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) {
final FlowManager flowManager = flowController.getFlowManager();
final List<ControllerServiceNode> serviceNodes = serviceIds.stream()
.map(flowManager::getControllerServiceNode)
.collect(Collectors.toList());
if (state == ControllerServiceState.ENABLED) {
flowController.getControllerServiceProvider().enableControllerServicesAsync(serviceNodes);
} else {
flowController.getControllerServiceProvider().disableControllerServicesAsync(serviceNodes);
}
}
@Override
public ProcessGroup updateProcessGroup(ProcessGroupDTO processGroupDTO) {
final ProcessGroup group = locateProcessGroup(flowController, processGroupDTO.getId());
final String name = processGroupDTO.getName();
final String comments = processGroupDTO.getComments();
final String concurrencyName = processGroupDTO.getFlowfileConcurrency();
final FlowFileConcurrency flowFileConcurrency = concurrencyName == null ? null : FlowFileConcurrency.valueOf(concurrencyName);
final String outboundPolicyName = processGroupDTO.getFlowfileOutboundPolicy();
final FlowFileOutboundPolicy flowFileOutboundPolicy = outboundPolicyName == null ? null : FlowFileOutboundPolicy.valueOf(outboundPolicyName);
final String defaultFlowFileExpiration = processGroupDTO.getDefaultFlowFileExpiration();
final Long defaultBackPressureObjectThreshold = processGroupDTO.getDefaultBackPressureObjectThreshold();
final String defaultBackPressureDataSizeThreshold = processGroupDTO.getDefaultBackPressureDataSizeThreshold();
final ParameterContextReferenceEntity parameterContextReference = processGroupDTO.getParameterContext();
if (parameterContextReference != null) {
final String parameterContextId = parameterContextReference.getId();
if (parameterContextId == null) {
group.setParameterContext(null);
} else {
final ParameterContext parameterContext = flowController.getFlowManager().getParameterContextManager().getParameterContext(parameterContextId);
if (parameterContext == null) {
throw new IllegalStateException("Cannot set Process Group's Parameter Context because no Parameter Context exists with ID " + parameterContextId);
}
group.setParameterContext(parameterContext);
}
}
if (isNotNull(name)) {
group.setName(name);
}
if (isNotNull(processGroupDTO.getPosition())) {
group.setPosition(new Position(processGroupDTO.getPosition().getX(), processGroupDTO.getPosition().getY()));
final ProcessGroup parent = group.getParent();
if (parent != null) {
parent.onComponentModified();
}
}
if (isNotNull(comments)) {
group.setComments(comments);
}
if (flowFileConcurrency != null) {
group.setFlowFileConcurrency(flowFileConcurrency);
}
if (flowFileOutboundPolicy != null) {
group.setFlowFileOutboundPolicy(flowFileOutboundPolicy);
}
if (defaultFlowFileExpiration != null) {
group.setDefaultFlowFileExpiration(processGroupDTO.getDefaultFlowFileExpiration());
}
if (defaultBackPressureObjectThreshold != null) {
group.setDefaultBackPressureObjectThreshold(processGroupDTO.getDefaultBackPressureObjectThreshold());
}
if (defaultBackPressureDataSizeThreshold != null) {
group.setDefaultBackPressureDataSizeThreshold(processGroupDTO.getDefaultBackPressureDataSizeThreshold());
}
group.onComponentModified();
return group;
}
@Override
public ProcessGroup updateVersionControlInformation(final VersionControlInformationDTO versionControlInformation, final Map<String, String> versionedComponentMapping) {
final String groupId = versionControlInformation.getGroupId();
final ProcessGroup group = locateProcessGroup(flowController, groupId);
final String registryId = versionControlInformation.getRegistryId();
final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getControllerServiceProvider(), flowController.getFlowRegistryClient(), false);
final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.registryName(registryName)
.flowSnapshot(flowSnapshot)
.build();
group.setVersionControlInformation(vci, versionedComponentMapping);
group.onComponentModified();
return group;
}
@Override
public ProcessGroup disconnectVersionControl(final String groupId) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.disconnectVersionControl(true);
group.onComponentModified();
return group;
}
@Override
public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
group.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
// process group being updated may not be versioned
if (versionControlInformation != null) {
final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.flowSnapshot(proposedSnapshot.getFlowContents())
.build();
group.setVersionControlInformation(svci, Collections.emptyMap());
}
group.onComponentModified();
return group;
}
@Override
public ProcessGroup updateVariableRegistry(final VariableRegistryDTO variableRegistry) {
final ProcessGroup group = locateProcessGroup(flowController, variableRegistry.getProcessGroupId());
if (group == null) {
throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistry.getProcessGroupId());
}
final Map<String, String> variableMap = new HashMap<>();
variableRegistry.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
.map(VariableEntity::getVariable)
.forEach(var -> variableMap.put(var.getName(), var.getValue()));
group.setVariables(variableMap);
group.onComponentModified();
return group;
}
@Override
public void verifyDelete(String groupId) {
ProcessGroup group = locateProcessGroup(flowController, groupId);
group.verifyCanDelete();
}
@Override
public void verifyDeleteFlowRegistry(String registryId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
final VersionControlInformation versionControlInformation = rootGroup.getVersionControlInformation();
if (versionControlInformation != null && versionControlInformation.getRegistryIdentifier().equals(registryId)) {
throw new IllegalStateException("The Registry cannot be removed because a Process Group currently under version control is tracking to it.");
}
final Set<VersionControlInformation> trackedVersionControlInformation = rootGroup.findAllProcessGroups().stream()
.map(group -> group.getVersionControlInformation())
.filter(Objects::nonNull)
.filter(vci -> vci.getRegistryIdentifier().equals(registryId))
.collect(Collectors.toSet());
if (!trackedVersionControlInformation.isEmpty()) {
throw new IllegalStateException("The Registry cannot be removed because a Process Group currently under version control is tracking to it.");
}
}
@Override
public DropFlowFileStatus createDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
ProcessGroup processGroup = locateProcessGroup(flowController, processGroupId);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user == null) {
throw new WebApplicationException(new Throwable("Unable to access details for current user."));
}
return processGroup.dropAllFlowFiles(dropRequestId, user.getIdentity());
}
@Override
public DropFlowFileStatus getDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
ProcessGroup processGroup = locateProcessGroup(flowController, processGroupId);
return processGroup.getDropAllFlowFilesStatus(dropRequestId);
}
@Override
public DropFlowFileStatus deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
ProcessGroup processGroup = locateProcessGroup(flowController, processGroupId);
return processGroup.cancelDropAllFlowFiles(dropRequestId);
}
@Override
public void deleteProcessGroup(String processGroupId) {
// get the group
ProcessGroup group = locateProcessGroup(flowController, processGroupId);
ProcessGroup parentGroup = group.getParent();
// ensure this isn't the root group
if (parentGroup == null) {
throw new IllegalArgumentException("The Root Group cannot be removed");
}
// remove the group
parentGroup.removeProcessGroup(group);
}
public void setFlowController(FlowController flowController) {
this.flowController = flowController;
}
}