| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.web.dao.impl; |
| |
| import org.apache.nifi.authorization.Authorizer; |
| import org.apache.nifi.authorization.RequestAction; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.authorization.resource.DataAuthorizable; |
| import org.apache.nifi.authorization.user.NiFiUser; |
| import org.apache.nifi.authorization.user.NiFiUserUtils; |
| import org.apache.nifi.connectable.Connectable; |
| import org.apache.nifi.connectable.ConnectableType; |
| import org.apache.nifi.connectable.Connection; |
| import org.apache.nifi.controller.queue.LoadBalanceCompression; |
| import org.apache.nifi.controller.queue.LoadBalanceStrategy; |
| import org.apache.nifi.connectable.Position; |
| import org.apache.nifi.controller.FlowController; |
| import org.apache.nifi.controller.exception.ValidationException; |
| import org.apache.nifi.controller.queue.DropFlowFileStatus; |
| import org.apache.nifi.controller.queue.FlowFileQueue; |
| import org.apache.nifi.controller.queue.ListFlowFileStatus; |
| import org.apache.nifi.controller.repository.ContentNotFoundException; |
| import org.apache.nifi.controller.repository.FlowFileRecord; |
| import org.apache.nifi.flowfile.FlowFilePrioritizer; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.groups.ProcessGroup; |
| import org.apache.nifi.groups.RemoteProcessGroup; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.remote.PublicPort; |
| import org.apache.nifi.remote.RemoteGroupPort; |
| import org.apache.nifi.remote.TransferDirection; |
| import org.apache.nifi.util.FormatUtils; |
| import org.apache.nifi.web.DownloadableContent; |
| import org.apache.nifi.web.ResourceNotFoundException; |
| import org.apache.nifi.web.api.dto.ConnectableDTO; |
| import org.apache.nifi.web.api.dto.ConnectionDTO; |
| import org.apache.nifi.web.api.dto.PositionDTO; |
| import org.apache.nifi.web.dao.ConnectionDAO; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.ws.rs.WebApplicationException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.regex.Matcher; |
| |
| public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO { |
| |
| private static final Logger logger = LoggerFactory.getLogger(StandardConnectionDAO.class); |
| |
| private FlowController flowController; |
| private Authorizer authorizer; |
| |
| private Connection locateConnection(final String connectionId) { |
| final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup(); |
| final Connection connection = rootGroup.findConnection(connectionId); |
| |
| if (connection == null) { |
| throw new ResourceNotFoundException(String.format("Unable to find connection with id '%s'.", connectionId)); |
| } else { |
| return connection; |
| } |
| } |
| |
| @Override |
| public boolean hasConnection(String id) { |
| final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup(); |
| return rootGroup.findConnection(id) != null; |
| } |
| |
| @Override |
| public Connection getConnection(final String id) { |
| return locateConnection(id); |
| } |
| |
| @Override |
| public Set<Connection> getConnections(final String groupId) { |
| final ProcessGroup group = locateProcessGroup(flowController, groupId); |
| return group.getConnections(); |
| } |
| |
| @Override |
| public DropFlowFileStatus getFlowFileDropRequest(String connectionId, String dropRequestId) { |
| final Connection connection = locateConnection(connectionId); |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| |
| final DropFlowFileStatus dropRequest = queue.getDropFlowFileStatus(dropRequestId); |
| if (dropRequest == null) { |
| throw new ResourceNotFoundException(String.format("Unable to find drop request with id '%s'.", dropRequestId)); |
| } |
| |
| return dropRequest; |
| } |
| |
| @Override |
| public ListFlowFileStatus getFlowFileListingRequest(String connectionId, String listingRequestId) { |
| final Connection connection = locateConnection(connectionId); |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| |
| final ListFlowFileStatus listRequest = queue.getListFlowFileStatus(listingRequestId); |
| if (listRequest == null) { |
| throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId)); |
| } |
| |
| return listRequest; |
| } |
| |
| @Override |
| public FlowFileRecord getFlowFile(String id, String flowFileUuid) { |
| try { |
| final Connection connection = locateConnection(id); |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid); |
| |
| if (flowFile == null) { |
| throw new ResourceNotFoundException(String.format("The FlowFile with UUID %s is no longer in the active queue.", flowFileUuid)); |
| } |
| |
| // get the attributes and ensure appropriate access |
| final Map<String, String> attributes = flowFile.getAttributes(); |
| final Authorizable dataAuthorizable = new DataAuthorizable(connection.getSourceAuthorizable()); |
| dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser(), attributes); |
| |
| return flowFile; |
| } catch (final IOException ioe) { |
| logger.error(String.format("Unable to get the flowfile (%s) at this time.", flowFileUuid), ioe); |
| throw new IllegalStateException("Unable to get the FlowFile at this time."); |
| } |
| } |
| |
| /** |
| * Configures the specified connection using the specified dto. |
| */ |
| private void configureConnection(Connection connection, ConnectionDTO connectionDTO) { |
| // validate flow file comparators/prioritizers |
| List<FlowFilePrioritizer> newPrioritizers = null; |
| final List<String> prioritizers = connectionDTO.getPrioritizers(); |
| if (isNotNull(prioritizers)) { |
| final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers); |
| newPrioritizers = new ArrayList<>(); |
| for (final String className : newPrioritizersClasses) { |
| try { |
| newPrioritizers.add(flowController.getFlowManager().createPrioritizer(className)); |
| } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { |
| throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e); |
| } |
| } |
| } |
| |
| // update connection queue |
| if (isNotNull(connectionDTO.getFlowFileExpiration())) { |
| connection.getFlowFileQueue().setFlowFileExpiration(connectionDTO.getFlowFileExpiration()); |
| } |
| if (isNotNull(connectionDTO.getBackPressureObjectThreshold())) { |
| connection.getFlowFileQueue().setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold()); |
| } |
| if (isNotNull(connectionDTO.getBackPressureDataSizeThreshold())) { |
| connection.getFlowFileQueue().setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold()); |
| } |
| if (isNotNull(newPrioritizers)) { |
| connection.getFlowFileQueue().setPriorities(newPrioritizers); |
| } |
| |
| final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy(); |
| final String loadBalancePartitionAttribute = connectionDTO.getLoadBalancePartitionAttribute(); |
| if (isNotNull(loadBalanceStrategyName)) { |
| final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName); |
| connection.getFlowFileQueue().setLoadBalanceStrategy(loadBalanceStrategy, loadBalancePartitionAttribute); |
| } |
| |
| final String loadBalanceCompressionName = connectionDTO.getLoadBalanceCompression(); |
| if (isNotNull(loadBalanceCompressionName)) { |
| connection.getFlowFileQueue().setLoadBalanceCompression(LoadBalanceCompression.valueOf(loadBalanceCompressionName)); |
| } |
| |
| // update the connection state |
| if (isNotNull(connectionDTO.getBends())) { |
| final List<Position> bendPoints = new ArrayList<>(); |
| for (final PositionDTO bend : connectionDTO.getBends()) { |
| if (bend != null) { |
| bendPoints.add(new Position(bend.getX(), bend.getY())); |
| } |
| } |
| connection.setBendPoints(bendPoints); |
| } |
| if (isNotNull(connectionDTO.getName())) { |
| connection.setName(connectionDTO.getName()); |
| } |
| if (isNotNull(connectionDTO.getLabelIndex())) { |
| connection.setLabelIndex(connectionDTO.getLabelIndex()); |
| } |
| if (isNotNull(connectionDTO.getzIndex())) { |
| connection.setZIndex(connectionDTO.getzIndex()); |
| } |
| } |
| |
| /** |
| * Validates the proposed processor configuration. |
| */ |
| private List<String> validateProposedConfiguration(final String groupId, final ConnectionDTO connectionDTO) { |
| List<String> validationErrors = new ArrayList<>(); |
| |
| if (isNotNull(connectionDTO.getBackPressureObjectThreshold()) && connectionDTO.getBackPressureObjectThreshold() < 0) { |
| validationErrors.add("Max queue size must be a non-negative integer"); |
| } |
| if (isNotNull(connectionDTO.getFlowFileExpiration())) { |
| Matcher expirationMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(connectionDTO.getFlowFileExpiration()); |
| if (!expirationMatcher.matches()) { |
| validationErrors.add("Flow file expiration is not a valid time duration (ie 30 sec, 5 min)"); |
| } |
| } |
| if (isNotNull(connectionDTO.getLabelIndex())) { |
| if (connectionDTO.getLabelIndex() < 0) { |
| validationErrors.add("The label index must be positive."); |
| } |
| } |
| |
| // validation is required when connecting to a remote process group since each node in a |
| // cluster may or may not be authorized |
| final ConnectableDTO proposedDestination = connectionDTO.getDestination(); |
| if (proposedDestination != null && ConnectableType.REMOTE_INPUT_PORT.name().equals(proposedDestination.getType())) { |
| // the group id must be specified |
| if (proposedDestination.getGroupId() == null) { |
| validationErrors.add("When the destination is a remote input port its group id is required."); |
| return validationErrors; |
| } |
| |
| // attempt to location the proprosed destination |
| final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId); |
| final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(proposedDestination.getGroupId()); |
| if (remoteProcessGroup == null) { |
| validationErrors.add("Unable to find the specified remote process group."); |
| return validationErrors; |
| } |
| |
| // ensure the new destination was found |
| final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(proposedDestination.getId()); |
| if (remoteInputPort == null) { |
| validationErrors.add("Unable to find the specified destination."); |
| return validationErrors; |
| } |
| } |
| |
| return validationErrors; |
| } |
| |
| @Override |
| public Connection createConnection(final String groupId, final ConnectionDTO connectionDTO) { |
| final ProcessGroup group = locateProcessGroup(flowController, groupId); |
| |
| if (isNotNull(connectionDTO.getParentGroupId()) && !flowController.getFlowManager().areGroupsSame(connectionDTO.getParentGroupId(), groupId)) { |
| throw new IllegalStateException("Cannot specify a different Parent Group ID than the Group to which the Connection is being added"); |
| } |
| |
| // get the source and destination connectables |
| final ConnectableDTO sourceConnectableDTO = connectionDTO.getSource(); |
| final ConnectableDTO destinationConnectableDTO = connectionDTO.getDestination(); |
| |
| // ensure both are specified |
| if (sourceConnectableDTO == null || destinationConnectableDTO == null) { |
| throw new IllegalArgumentException("Both source and destinations must be specified."); |
| } |
| |
| // if the source/destination connectable's group id has not been set, its inferred to be the current group |
| if (sourceConnectableDTO.getGroupId() == null) { |
| sourceConnectableDTO.setGroupId(groupId); |
| } |
| if (destinationConnectableDTO.getGroupId() == null) { |
| destinationConnectableDTO.setGroupId(groupId); |
| } |
| |
| // validate the proposed configuration |
| final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO); |
| |
| // ensure there was no validation errors |
| if (!validationErrors.isEmpty()) { |
| throw new ValidationException(validationErrors); |
| } |
| |
| // find the source |
| final Connectable source; |
| if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceConnectableDTO.getType())) { |
| final ProcessGroup sourceParentGroup = locateProcessGroup(flowController, groupId); |
| final RemoteProcessGroup remoteProcessGroup = sourceParentGroup.getRemoteProcessGroup(sourceConnectableDTO.getGroupId()); |
| source = remoteProcessGroup.getOutputPort(sourceConnectableDTO.getId()); |
| } else { |
| final ProcessGroup sourceGroup = locateProcessGroup(flowController, sourceConnectableDTO.getGroupId()); |
| source = sourceGroup.getConnectable(sourceConnectableDTO.getId()); |
| } |
| |
| // find the destination |
| final Connectable destination; |
| if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationConnectableDTO.getType())) { |
| final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId); |
| final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(destinationConnectableDTO.getGroupId()); |
| destination = remoteProcessGroup.getInputPort(destinationConnectableDTO.getId()); |
| } else { |
| final ProcessGroup destinationGroup = locateProcessGroup(flowController, destinationConnectableDTO.getGroupId()); |
| destination = destinationGroup.getConnectable(destinationConnectableDTO.getId()); |
| } |
| |
| // determine the relationships |
| final Set<String> relationships = new HashSet<>(); |
| if (isNotNull(connectionDTO.getSelectedRelationships())) { |
| relationships.addAll(connectionDTO.getSelectedRelationships()); |
| } |
| |
| // create the connection |
| final Connection connection = flowController.createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships); |
| |
| // configure the connection |
| configureConnection(connection, connectionDTO); |
| |
| // add the connection to the group |
| group.addConnection(connection); |
| return connection; |
| } |
| |
| @Override |
| public DropFlowFileStatus createFlowFileDropRequest(String id, String dropRequestId) { |
| final Connection connection = locateConnection(id); |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| if (user == null) { |
| throw new WebApplicationException(new Throwable("Unable to access details for current user.")); |
| } |
| |
| return queue.dropFlowFiles(dropRequestId, user.getIdentity()); |
| } |
| |
| @Override |
| public ListFlowFileStatus createFlowFileListingRequest(String id, String listingRequestId) { |
| final Connection connection = locateConnection(id); |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| |
| // ensure we can list |
| verifyList(queue); |
| |
| return queue.listFlowFiles(listingRequestId, 100); |
| } |
| |
| @Override |
| public void verifyCreate(String groupId, ConnectionDTO connectionDTO) { |
| // validate the incoming request |
| final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO); |
| |
| // ensure there was no validation errors |
| if (!validationErrors.isEmpty()) { |
| throw new ValidationException(validationErrors); |
| } |
| |
| // Ensure that both the source and the destination for the connection exist. |
| // In the case that the source or destination is a port in a Remote Process Group, |
| // this is necessary because the ports can change in the background. It may still be |
| // possible for a port to disappear between the 'verify' stage and the creation stage, |
| // but this prevents the case where some nodes already know about the port while other |
| // nodes in the cluster do not. This is a more common case, as users may try to connect |
| // to the port as soon as the port is created. |
| final ConnectableDTO sourceDto = connectionDTO.getSource(); |
| if (sourceDto == null || sourceDto.getId() == null) { |
| throw new IllegalArgumentException("Cannot create connection without specifying source"); |
| } |
| |
| final ConnectableDTO destinationDto = connectionDTO.getDestination(); |
| if (destinationDto == null || destinationDto.getId() == null) { |
| throw new IllegalArgumentException("Cannot create connection without specifying destination"); |
| } |
| |
| if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDto.getType())) { |
| final ProcessGroup sourceParentGroup = locateProcessGroup(flowController, groupId); |
| |
| final RemoteProcessGroup remoteProcessGroup = sourceParentGroup.getRemoteProcessGroup(sourceDto.getGroupId()); |
| if (remoteProcessGroup == null) { |
| throw new IllegalArgumentException("Unable to find the specified remote process group."); |
| } |
| |
| final RemoteGroupPort sourceConnectable = remoteProcessGroup.getOutputPort(sourceDto.getId()); |
| if (sourceConnectable == null) { |
| throw new IllegalArgumentException("The specified source for the connection does not exist"); |
| } else if (!sourceConnectable.getTargetExists()) { |
| throw new IllegalArgumentException("The specified remote output port does not exist."); |
| } |
| } else { |
| final ProcessGroup sourceGroup = locateProcessGroup(flowController, sourceDto.getGroupId()); |
| final Connectable sourceConnectable = sourceGroup.getConnectable(sourceDto.getId()); |
| if (sourceConnectable == null) { |
| throw new IllegalArgumentException("The specified source for the connection does not exist"); |
| } |
| if (sourceConnectable instanceof PublicPort |
| && TransferDirection.SEND.equals(((PublicPort) sourceConnectable).getDirection())) { |
| throw new IllegalArgumentException("The specified source for the connection cannot be connected to local components."); |
| } |
| } |
| |
| if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDto.getType())) { |
| final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, groupId); |
| |
| final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(destinationDto.getGroupId()); |
| if (remoteProcessGroup == null) { |
| throw new IllegalArgumentException("Unable to find the specified remote process group."); |
| } |
| |
| final RemoteGroupPort destinationConnectable = remoteProcessGroup.getInputPort(destinationDto.getId()); |
| if (destinationConnectable == null) { |
| throw new IllegalArgumentException("The specified destination for the connection does not exist"); |
| } else if (!destinationConnectable.getTargetExists()) { |
| throw new IllegalArgumentException("The specified remote input port does not exist."); |
| } |
| } else { |
| final ProcessGroup destinationGroup = locateProcessGroup(flowController, destinationDto.getGroupId()); |
| final Connectable destinationConnectable = destinationGroup.getConnectable(destinationDto.getId()); |
| if (destinationConnectable == null) { |
| throw new IllegalArgumentException("The specified destination for the connection does not exist"); |
| } |
| if (destinationConnectable instanceof PublicPort |
| && TransferDirection.RECEIVE.equals(((PublicPort) destinationConnectable).getDirection())) { |
| throw new IllegalArgumentException("The specified destination for the connection cannot be connected from local components."); |
| } |
| } |
| } |
| |
| private void verifyList(final FlowFileQueue queue) { |
| queue.verifyCanList(); |
| } |
| |
| @Override |
| public void verifyList(String id) { |
| final Connection connection = locateConnection(id); |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| verifyList(queue); |
| } |
| |
| @Override |
| public void verifyUpdate(ConnectionDTO connectionDTO) { |
| verifyUpdate(locateConnection(connectionDTO.getId()), connectionDTO); |
| } |
| |
| private void verifyUpdate(final Connection connection, final ConnectionDTO connectionDTO) { |
| // determine what the request is attempting |
| if (isAnyNotNull(connectionDTO.getBackPressureDataSizeThreshold(), |
| connectionDTO.getBackPressureObjectThreshold(), |
| connectionDTO.getDestination(), |
| connectionDTO.getFlowFileExpiration(), |
| connectionDTO.getName(), |
| connectionDTO.getPosition(), |
| connectionDTO.getPrioritizers(), |
| connectionDTO.getSelectedRelationships())) { |
| |
| // validate the incoming request |
| final List<String> validationErrors = validateProposedConfiguration(connection.getProcessGroup().getIdentifier(), connectionDTO); |
| |
| // ensure there was no validation errors |
| if (!validationErrors.isEmpty()) { |
| throw new ValidationException(validationErrors); |
| } |
| |
| // If destination is changing, ensure that current destination is not running. This check is done here, rather than |
| // in the Connection object itself because the Connection object itself does not know which updates are to occur and |
| // we don't want to prevent updating things like the connection name or backpressure just because the destination is running |
| final Connectable destination = connection.getDestination(); |
| if (destination != null && destination.isRunning() && destination.getConnectableType() != ConnectableType.FUNNEL && destination.getConnectableType() != ConnectableType.INPUT_PORT) { |
| throw new ValidationException(Collections.singletonList("Cannot change the destination of connection because the current destination is running")); |
| } |
| |
| // verify that this connection supports modification |
| connection.verifyCanUpdate(); |
| } |
| } |
| |
| @Override |
| public Connection updateConnection(final ConnectionDTO connectionDTO) { |
| final Connection connection = locateConnection(connectionDTO.getId()); |
| final ProcessGroup group = connection.getProcessGroup(); |
| |
| // ensure we can update |
| verifyUpdate(connection, connectionDTO); |
| |
| final Collection<Relationship> newProcessorRelationships = new ArrayList<>(); |
| Connectable newDestination = null; |
| |
| // ensure that the source ID is correct, if specified. |
| final Connectable existingSource = connection.getSource(); |
| if (isNotNull(connectionDTO.getSource()) && !existingSource.getIdentifier().equals(connectionDTO.getSource().getId())) { |
| throw new IllegalStateException("Connection with ID " + connectionDTO.getId() + " has conflicting Source ID"); |
| } |
| |
| // determine any new relationships |
| final Set<String> relationships = connectionDTO.getSelectedRelationships(); |
| if (isNotNull(relationships)) { |
| if (relationships.isEmpty()) { |
| throw new IllegalArgumentException("Cannot remove all relationships from Connection with ID " + connection.getIdentifier() + " -- remove the Connection instead"); |
| } |
| if (existingSource == null) { |
| throw new IllegalArgumentException("Cannot specify new relationships without including the source."); |
| } |
| |
| for (final String relationship : relationships) { |
| final Relationship processorRelationship = existingSource.getRelationship(relationship); |
| if (processorRelationship == null) { |
| throw new IllegalArgumentException("Unable to locate " + relationship + " relationship."); |
| } |
| newProcessorRelationships.add(processorRelationship); |
| } |
| } |
| |
| // determine if the destination changed |
| final ConnectableDTO proposedDestination = connectionDTO.getDestination(); |
| if (proposedDestination != null) { |
| final Connectable currentDestination = connection.getDestination(); |
| |
| // handle remote input port differently |
| if (ConnectableType.REMOTE_INPUT_PORT.name().equals(proposedDestination.getType())) { |
| // the group id must be specified |
| if (proposedDestination.getGroupId() == null) { |
| throw new IllegalArgumentException("When the destination is a remote input port its group id is required."); |
| } |
| |
| // if the current destination is a remote input port |
| boolean isDifferentRemoteProcessGroup = false; |
| if (currentDestination.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) { |
| RemoteGroupPort remotePort = (RemoteGroupPort) currentDestination; |
| if (!proposedDestination.getGroupId().equals(remotePort.getRemoteProcessGroup().getIdentifier())) { |
| isDifferentRemoteProcessGroup = true; |
| } |
| } |
| |
| // if the destination is changing or the previous destination was a different remote process group |
| if (!proposedDestination.getId().equals(currentDestination.getIdentifier()) || isDifferentRemoteProcessGroup) { |
| final ProcessGroup destinationParentGroup = locateProcessGroup(flowController, group.getIdentifier()); |
| final RemoteProcessGroup remoteProcessGroup = destinationParentGroup.getRemoteProcessGroup(proposedDestination.getGroupId()); |
| |
| // ensure the remote process group was found |
| if (remoteProcessGroup == null) { |
| throw new IllegalArgumentException("Unable to find the specified remote process group."); |
| } |
| |
| final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(proposedDestination.getId()); |
| |
| // ensure the new destination was found |
| if (remoteInputPort == null) { |
| throw new IllegalArgumentException("Unable to find the specified destination."); |
| } |
| |
| // ensure the remote port actually exists |
| if (!remoteInputPort.getTargetExists()) { |
| throw new IllegalArgumentException("The specified remote input port does not exist."); |
| } else { |
| newDestination = remoteInputPort; |
| } |
| } |
| } else { |
| // if there is a different destination id |
| if (!proposedDestination.getId().equals(currentDestination.getIdentifier())) { |
| // if the destination connectable's group id has not been set, its inferred to be the current group |
| if (proposedDestination.getGroupId() == null) { |
| proposedDestination.setGroupId(group.getIdentifier()); |
| } |
| |
| final ProcessGroup destinationGroup = locateProcessGroup(flowController, proposedDestination.getGroupId()); |
| newDestination = destinationGroup.getConnectable(proposedDestination.getId()); |
| |
| // ensure the new destination was found |
| if (newDestination == null) { |
| throw new IllegalArgumentException("Unable to find the specified destination."); |
| } |
| } |
| } |
| } |
| |
| // configure the connection |
| configureConnection(connection, connectionDTO); |
| group.onComponentModified(); |
| |
| // update the relationships if necessary |
| if (!newProcessorRelationships.isEmpty()) { |
| connection.setRelationships(newProcessorRelationships); |
| } |
| |
| // update the destination if necessary |
| if (isNotNull(newDestination)) { |
| connection.setDestination(newDestination); |
| } |
| |
| return connection; |
| } |
| |
| @Override |
| public void verifyDelete(String id) { |
| final Connection connection = locateConnection(id); |
| connection.verifyCanDelete(); |
| } |
| |
| @Override |
| public void deleteConnection(final String id) { |
| final Connection connection = locateConnection(id); |
| connection.getProcessGroup().removeConnection(connection); |
| } |
| |
| @Override |
| public DropFlowFileStatus deleteFlowFileDropRequest(String connectionId, String dropRequestId) { |
| final Connection connection = locateConnection(connectionId); |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| |
| final DropFlowFileStatus dropFlowFileStatus = queue.cancelDropFlowFileRequest(dropRequestId); |
| if (dropFlowFileStatus == null) { |
| throw new ResourceNotFoundException(String.format("Unable to find drop request with id '%s'.", dropRequestId)); |
| } |
| |
| return dropFlowFileStatus; |
| } |
| |
| @Override |
| public ListFlowFileStatus deleteFlowFileListingRequest(String connectionId, String listingRequestId) { |
| final Connection connection = locateConnection(connectionId); |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| |
| final ListFlowFileStatus listFlowFileStatus = queue.cancelListFlowFileRequest(listingRequestId); |
| if (listFlowFileStatus == null) { |
| throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId)); |
| } |
| |
| return listFlowFileStatus; |
| } |
| |
| @Override |
| public DownloadableContent getContent(String id, String flowFileUuid, String requestUri) { |
| try { |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| final Connection connection = locateConnection(id); |
| final FlowFileQueue queue = connection.getFlowFileQueue(); |
| final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid); |
| |
| if (flowFile == null) { |
| throw new ResourceNotFoundException(String.format("The FlowFile with UUID %s is no longer in the active queue.", flowFileUuid)); |
| } |
| |
| // get the attributes and ensure appropriate access |
| final Map<String, String> attributes = flowFile.getAttributes(); |
| final Authorizable dataAuthorizable = new DataAuthorizable(connection.getSourceAuthorizable()); |
| dataAuthorizable.authorize(authorizer, RequestAction.READ, user, attributes); |
| |
| // get the filename and fall back to the identifier (should never happen) |
| String filename = attributes.get(CoreAttributes.FILENAME.key()); |
| if (filename == null) { |
| filename = flowFileUuid; |
| } |
| |
| // get the mime-type |
| final String type = attributes.get(CoreAttributes.MIME_TYPE.key()); |
| |
| // get the content |
| final InputStream content = flowController.getContent(flowFile, user.getIdentity(), requestUri); |
| return new DownloadableContent(filename, type, content); |
| } catch (final ContentNotFoundException cnfe) { |
| throw new ResourceNotFoundException("Unable to find the specified content."); |
| } catch (final IOException ioe) { |
| logger.error(String.format("Unable to get the content for flowfile (%s) at this time.", flowFileUuid), ioe); |
| throw new IllegalStateException("Unable to get the content at this time."); |
| } |
| } |
| |
| /* setters */ |
| public void setFlowController(final FlowController flowController) { |
| this.flowController = flowController; |
| } |
| |
| public void setAuthorizer(Authorizer authorizer) { |
| this.authorizer = authorizer; |
| } |
| } |