| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.web.api; |
| |
| import com.fasterxml.jackson.annotation.JsonInclude; |
| import com.fasterxml.jackson.databind.DeserializationFeature; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; |
| import io.swagger.annotations.Api; |
| import io.swagger.annotations.ApiImplicitParam; |
| import io.swagger.annotations.ApiImplicitParams; |
| import io.swagger.annotations.ApiOperation; |
| import io.swagger.annotations.ApiParam; |
| import io.swagger.annotations.ApiResponse; |
| import io.swagger.annotations.ApiResponses; |
| import io.swagger.annotations.Authorization; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.ws.rs.Consumes; |
| import javax.ws.rs.DELETE; |
| import javax.ws.rs.DefaultValue; |
| import javax.ws.rs.GET; |
| import javax.ws.rs.HttpMethod; |
| import javax.ws.rs.POST; |
| import javax.ws.rs.PUT; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.PathParam; |
| import javax.ws.rs.Produces; |
| import javax.ws.rs.QueryParam; |
| import javax.ws.rs.core.Context; |
| import javax.ws.rs.core.HttpHeaders; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.MultivaluedHashMap; |
| import javax.ws.rs.core.MultivaluedMap; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.Response.Status; |
| import javax.ws.rs.core.UriBuilder; |
| import javax.ws.rs.core.UriInfo; |
| import javax.xml.bind.JAXBContext; |
| import javax.xml.bind.JAXBElement; |
| import javax.xml.bind.JAXBException; |
| import javax.xml.bind.Unmarshaller; |
| import javax.xml.stream.XMLStreamReader; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.text.StringEscapeUtils; |
| import org.apache.nifi.authorization.AuthorizableLookup; |
| import org.apache.nifi.authorization.AuthorizeAccess; |
| import org.apache.nifi.authorization.AuthorizeControllerServiceReference; |
| import org.apache.nifi.authorization.AuthorizeParameterReference; |
| import org.apache.nifi.authorization.ComponentAuthorizable; |
| import org.apache.nifi.authorization.ConnectionAuthorizable; |
| import org.apache.nifi.authorization.ProcessGroupAuthorizable; |
| import org.apache.nifi.authorization.RequestAction; |
| import org.apache.nifi.authorization.SnippetAuthorizable; |
| import org.apache.nifi.authorization.TemplateContentsAuthorizable; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.authorization.user.NiFiUser; |
| import org.apache.nifi.authorization.user.NiFiUserDetails; |
| import org.apache.nifi.authorization.user.NiFiUserUtils; |
| import org.apache.nifi.bundle.BundleCoordinate; |
| import org.apache.nifi.cluster.manager.NodeResponse; |
| import org.apache.nifi.components.ConfigurableComponent; |
| import org.apache.nifi.connectable.ConnectableType; |
| import org.apache.nifi.controller.ScheduledState; |
| import org.apache.nifi.controller.serialization.FlowEncodingVersion; |
| import org.apache.nifi.controller.service.ControllerServiceState; |
| import org.apache.nifi.parameter.ParameterContext; |
| import org.apache.nifi.registry.bucket.Bucket; |
| import org.apache.nifi.registry.client.NiFiRegistryException; |
| import org.apache.nifi.registry.flow.FlowRegistryUtils; |
| import org.apache.nifi.registry.flow.VersionedFlow; |
| import org.apache.nifi.registry.flow.VersionedFlowSnapshot; |
| import org.apache.nifi.registry.flow.VersionedFlowState; |
| import org.apache.nifi.registry.flow.VersionedParameterContext; |
| import org.apache.nifi.registry.flow.VersionedProcessGroup; |
| import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; |
| import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; |
| import org.apache.nifi.remote.util.SiteToSiteRestApiClient; |
| import org.apache.nifi.security.xml.XmlUtils; |
| import org.apache.nifi.web.ResourceNotFoundException; |
| import org.apache.nifi.web.Revision; |
| import org.apache.nifi.web.api.dto.AffectedComponentDTO; |
| import org.apache.nifi.web.api.dto.BundleDTO; |
| import org.apache.nifi.web.api.dto.ConnectionDTO; |
| import org.apache.nifi.web.api.dto.ControllerServiceDTO; |
| import org.apache.nifi.web.api.dto.DropRequestDTO; |
| import org.apache.nifi.web.api.dto.FlowSnippetDTO; |
| import org.apache.nifi.web.api.dto.PortDTO; |
| import org.apache.nifi.web.api.dto.PositionDTO; |
| import org.apache.nifi.web.api.dto.ProcessGroupDTO; |
| import org.apache.nifi.web.api.dto.ProcessGroupReplaceRequestDTO; |
| import org.apache.nifi.web.api.dto.ProcessorConfigDTO; |
| import org.apache.nifi.web.api.dto.ProcessorDTO; |
| import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; |
| import org.apache.nifi.web.api.dto.RevisionDTO; |
| import org.apache.nifi.web.api.dto.TemplateDTO; |
| import org.apache.nifi.web.api.dto.VariableRegistryDTO; |
| import org.apache.nifi.web.api.dto.VersionControlInformationDTO; |
| import org.apache.nifi.web.api.dto.flow.FlowDTO; |
| import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; |
| import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; |
| import org.apache.nifi.web.api.entity.AffectedComponentEntity; |
| import org.apache.nifi.web.api.entity.ConnectionEntity; |
| import org.apache.nifi.web.api.entity.ConnectionsEntity; |
| import org.apache.nifi.web.api.entity.ControllerServiceEntity; |
| import org.apache.nifi.web.api.entity.ControllerServicesEntity; |
| import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; |
| import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity; |
| import org.apache.nifi.web.api.entity.DropRequestEntity; |
| import org.apache.nifi.web.api.entity.Entity; |
| import org.apache.nifi.web.api.entity.FlowComparisonEntity; |
| import org.apache.nifi.web.api.entity.FlowEntity; |
| import org.apache.nifi.web.api.entity.FunnelEntity; |
| import org.apache.nifi.web.api.entity.FunnelsEntity; |
| import org.apache.nifi.web.api.entity.InputPortsEntity; |
| import org.apache.nifi.web.api.entity.InstantiateTemplateRequestEntity; |
| import org.apache.nifi.web.api.entity.LabelEntity; |
| import org.apache.nifi.web.api.entity.LabelsEntity; |
| import org.apache.nifi.web.api.entity.OutputPortsEntity; |
| import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity; |
| import org.apache.nifi.web.api.entity.PortEntity; |
| import org.apache.nifi.web.api.entity.ProcessGroupEntity; |
| import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; |
| import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity; |
| import org.apache.nifi.web.api.entity.ProcessGroupUploadEntity; |
| import org.apache.nifi.web.api.entity.ProcessGroupsEntity; |
| import org.apache.nifi.web.api.entity.ProcessorEntity; |
| import org.apache.nifi.web.api.entity.ProcessorsEntity; |
| import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; |
| import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; |
| import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; |
| import org.apache.nifi.web.api.entity.TemplateEntity; |
| import org.apache.nifi.web.api.entity.VariableRegistryEntity; |
| import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity; |
| import org.apache.nifi.web.api.request.ClientIdParameter; |
| import org.apache.nifi.web.api.request.LongParameter; |
| import org.apache.nifi.web.security.token.NiFiAuthenticationToken; |
| import org.apache.nifi.web.util.Pause; |
| import org.glassfish.jersey.media.multipart.FormDataParam; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.security.core.Authentication; |
| import org.springframework.security.core.context.SecurityContextHolder; |
| |
| /** |
| * RESTful endpoint for managing a Group. |
| */ |
| @Path("/process-groups") |
| @Api( |
| value = "/process-groups", |
| description = "Endpoint for managing a Process Group." |
| ) |
| public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportEntity, ProcessGroupReplaceRequestEntity> { |
| |
| private static final Logger logger = LoggerFactory.getLogger(ProcessGroupResource.class); |
| |
| private ProcessorResource processorResource; |
| private InputPortResource inputPortResource; |
| private OutputPortResource outputPortResource; |
| private FunnelResource funnelResource; |
| private LabelResource labelResource; |
| private RemoteProcessGroupResource remoteProcessGroupResource; |
| private ConnectionResource connectionResource; |
| private TemplateResource templateResource; |
| private ControllerServiceResource controllerServiceResource; |
| |
| private final ConcurrentMap<String, VariableRegistryUpdateRequest> varRegistryUpdateRequests = new ConcurrentHashMap<>(); |
| private static final int MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS = 100; |
| private static final long VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION = TimeUnit.MINUTES.toMillis(1L); |
| private final ExecutorService variableRegistryThreadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS, |
| new ArrayBlockingQueue<Runnable>(MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS), |
| new ThreadFactory() { |
| @Override |
| public Thread newThread(final Runnable r) { |
| final Thread thread = Executors.defaultThreadFactory().newThread(r); |
| thread.setName("Variable Registry Update Thread"); |
| thread.setDaemon(true); |
| return thread; |
| } |
| }); |
| private static final ObjectMapper MAPPER = new ObjectMapper(); |
| static { |
| MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); |
| MAPPER.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL)); |
| MAPPER.setAnnotationIntrospector(new JaxbAnnotationIntrospector(MAPPER.getTypeFactory())); |
| MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); |
| } |
| |
| /** |
| * Populates the remaining fields in the specified process groups. |
| * |
| * @param processGroupEntities groups |
| * @return group dto |
| */ |
| public Set<ProcessGroupEntity> populateRemainingProcessGroupEntitiesContent(Set<ProcessGroupEntity> processGroupEntities) { |
| for (ProcessGroupEntity processGroupEntity : processGroupEntities) { |
| populateRemainingProcessGroupEntityContent(processGroupEntity); |
| } |
| return processGroupEntities; |
| } |
| |
| /** |
| * Populates the remaining fields in the specified process group. |
| * |
| * @param processGroupEntity group |
| * @return group dto |
| */ |
| public ProcessGroupEntity populateRemainingProcessGroupEntityContent(ProcessGroupEntity processGroupEntity) { |
| processGroupEntity.setUri(generateResourceUri("process-groups", processGroupEntity.getId())); |
| return processGroupEntity; |
| } |
| |
| |
| /** |
| * Populates the remaining content of the specified snippet. |
| */ |
| private FlowDTO populateRemainingSnippetContent(FlowDTO flow) { |
| processorResource.populateRemainingProcessorEntitiesContent(flow.getProcessors()); |
| connectionResource.populateRemainingConnectionEntitiesContent(flow.getConnections()); |
| inputPortResource.populateRemainingInputPortEntitiesContent(flow.getInputPorts()); |
| outputPortResource.populateRemainingOutputPortEntitiesContent(flow.getOutputPorts()); |
| remoteProcessGroupResource.populateRemainingRemoteProcessGroupEntitiesContent(flow.getRemoteProcessGroups()); |
| funnelResource.populateRemainingFunnelEntitiesContent(flow.getFunnels()); |
| labelResource.populateRemainingLabelEntitiesContent(flow.getLabels()); |
| |
| // go through each process group child and populate its uri |
| if (flow.getProcessGroups() != null) { |
| populateRemainingProcessGroupEntitiesContent(flow.getProcessGroups()); |
| } |
| |
| return flow; |
| } |
| |
| /** |
| * Retrieves the contents of the specified group. |
| * |
| * @param groupId The id of the process group. |
| * @return A processGroupEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}") |
| @ApiOperation( |
| value = "Gets a process group", |
| response = ProcessGroupEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getProcessGroup( |
| @ApiParam( |
| value = "The process group id.", |
| required = false |
| ) |
| @PathParam("id") final String groupId) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // get this process group contents |
| final ProcessGroupEntity entity = serviceFacade.getProcessGroup(groupId); |
| populateRemainingProcessGroupEntityContent(entity); |
| |
| if (entity.getComponent() != null) { |
| entity.getComponent().setContents(null); |
| } |
| |
| return generateOkResponse(entity).build(); |
| } |
| |
| /** |
| * Retrieves the specified group as a versioned flow snapshot for download. |
| * |
| * @param groupId The id of the process group |
| * @return A processGroupEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/download") |
| @ApiOperation( |
| value = "Gets a process group for download", |
| response = String.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response exportProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId) { |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| // ensure access to process groups (nested), encapsulated controller services and referenced parameter contexts |
| final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); |
| authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, |
| false, true, false, true); |
| }); |
| |
| // get the versioned flow |
| final VersionedFlowSnapshot currentVersionedFlowSnapshot = serviceFacade.getCurrentFlowSnapshotByGroupId(groupId); |
| |
| // determine the name of the attachment - possible issues with spaces in file names |
| final VersionedProcessGroup currentVersionedProcessGroup = currentVersionedFlowSnapshot.getFlowContents(); |
| final String flowName = currentVersionedProcessGroup.getName(); |
| final String filename = flowName.replaceAll("\\s", "_") + ".json"; |
| |
| return generateOkResponse(currentVersionedFlowSnapshot).header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", filename)).build(); |
| } |
| |
| /** |
| * Retrieves a list of local modifications to the Process Group since it was last synchronized with the Flow Registry |
| * |
| * @param groupId The id of the process group. |
| * @return A processGroupEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/local-modifications") |
| @ApiOperation( |
| value = "Gets a list of local modifications to the Process Group since it was last synchronized with the Flow Registry", |
| response = FlowComparisonEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}"), |
| @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getLocalModifications( |
| @ApiParam( |
| value = "The process group id.", |
| required = false |
| ) |
| @PathParam("id") final String groupId) throws IOException, NiFiRegistryException { |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); |
| authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, false, false, true, false, false); |
| }); |
| |
| final FlowComparisonEntity entity = serviceFacade.getLocalModifications(groupId); |
| return generateOkResponse(entity).build(); |
| } |
| |
| |
| /** |
| * Retrieves the Variable Registry for the group with the given ID |
| * |
| * @param groupId the ID of the Process Group |
| * @return the Variable Registry for the group |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/variable-registry") |
| @ApiOperation(value = "Gets a process group's variable registry", |
| response = VariableRegistryEntity.class, |
| notes = NON_GUARANTEED_ENDPOINT, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response getVariableRegistry( |
| @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, |
| @ApiParam(value = "Whether or not to include ancestor groups", required = false) @QueryParam("includeAncestorGroups") @DefaultValue("true") final boolean includeAncestorGroups) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // get this process group's variable registry |
| final VariableRegistryEntity entity = serviceFacade.getVariableRegistry(groupId, includeAncestorGroups); |
| return generateOkResponse(entity).build(); |
| } |
| |
| /** |
| * Updates the specified process group. |
| * |
| * @param httpServletRequest request |
| * @param id The id of the process group. |
| * @param requestProcessGroupEntity A processGroupEntity. |
| * @return A processGroupEntity. |
| */ |
| @PUT |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}") |
| @ApiOperation( |
| value = "Updates a process group", |
| response = ProcessGroupEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response updateProcessGroup( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String id, |
| @ApiParam( |
| value = "The process group configuration details.", |
| required = true |
| ) final ProcessGroupEntity requestProcessGroupEntity) { |
| |
| if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Process group details must be specified."); |
| } |
| |
| if (requestProcessGroupEntity.getRevision() == null) { |
| throw new IllegalArgumentException("Revision must be specified."); |
| } |
| |
| // ensure the same id is being used |
| final ProcessGroupDTO requestProcessGroupDTO = requestProcessGroupEntity.getComponent(); |
| if (!id.equals(requestProcessGroupDTO.getId())) { |
| throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " |
| + "not equal the process group id of the requested resource (%s).", requestProcessGroupDTO.getId(), id)); |
| } |
| |
| final PositionDTO proposedPosition = requestProcessGroupDTO.getPosition(); |
| if (proposedPosition != null) { |
| if (proposedPosition.getX() == null || proposedPosition.getY() == null) { |
| throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified."); |
| } |
| } |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.PUT, requestProcessGroupEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestProcessGroupEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| // handle expects request (usually from the cluster manager) |
| final Revision requestRevision = getRevision(requestProcessGroupEntity, id); |
| return withWriteLock( |
| serviceFacade, |
| requestProcessGroupEntity, |
| requestRevision, |
| lookup -> { |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| Authorizable authorizable = lookup.getProcessGroup(id).getAuthorizable(); |
| authorizable.authorize(authorizer, RequestAction.WRITE, user); |
| |
| // Ensure that user has READ permission on current Parameter Context (if any) because user is un-binding. |
| final ParameterContextReferenceEntity referencedParamContext = requestProcessGroupDTO.getParameterContext(); |
| if (referencedParamContext != null) { |
| // Lookup the current Parameter Context and determine whether or not the Parameter Context is changing |
| final String groupId = requestProcessGroupDTO.getId(); |
| final ProcessGroupEntity currentGroupEntity = serviceFacade.getProcessGroup(groupId); |
| final ProcessGroupDTO groupDto = currentGroupEntity.getComponent(); |
| final ParameterContextReferenceEntity currentParamContext = groupDto.getParameterContext(); |
| final String currentParamContextId = currentParamContext == null ? null : currentParamContext.getId(); |
| final boolean parameterContextChanging = !Objects.equals(referencedParamContext.getId(), currentParamContextId); |
| |
| // If Parameter Context is changing... |
| if (parameterContextChanging) { |
| // In order to bind to a Parameter Context, the user must have the READ policy to that Parameter Context. |
| if (referencedParamContext.getId() != null) { |
| lookup.getParameterContext(referencedParamContext.getId()).authorize(authorizer, RequestAction.READ, user); |
| } |
| |
| // If currently referencing a Parameter Context, we must authorize that the user has READ permissions on the Parameter Context in order to un-bind to it. |
| if (currentParamContextId != null) { |
| lookup.getParameterContext(currentParamContextId).authorize(authorizer, RequestAction.READ, user); |
| } |
| |
| // Because the user will be changing the behavior of any component in this group that is currently referencing any Parameter, we must ensure that the user has |
| // both READ and WRITE policies for each of those components. |
| for (final AffectedComponentEntity affectedComponentEntity : serviceFacade.getProcessorsReferencingParameter(groupId)) { |
| final Authorizable processorAuthorizable = lookup.getProcessor(affectedComponentEntity.getId()).getAuthorizable(); |
| processorAuthorizable.authorize(authorizer, RequestAction.READ, user); |
| processorAuthorizable.authorize(authorizer, RequestAction.WRITE, user); |
| } |
| |
| for (final AffectedComponentEntity affectedComponentEntity : serviceFacade.getControllerServicesReferencingParameter(groupId)) { |
| final Authorizable serviceAuthorizable = lookup.getControllerService(affectedComponentEntity.getId()).getAuthorizable(); |
| serviceAuthorizable.authorize(authorizer, RequestAction.READ, user); |
| serviceAuthorizable.authorize(authorizer, RequestAction.WRITE, user); |
| } |
| } |
| } |
| }, |
| () -> serviceFacade.verifyUpdateProcessGroup(requestProcessGroupDTO), |
| (revision, processGroupEntity) -> { |
| // update the process group |
| final ProcessGroupEntity entity = serviceFacade.updateProcessGroup(revision, processGroupEntity.getComponent()); |
| populateRemainingProcessGroupEntityContent(entity); |
| |
| // prune response as necessary |
| if (entity.getComponent() != null) { |
| entity.getComponent().setContents(null); |
| } |
| |
| return generateOkResponse(entity).build(); |
| } |
| ); |
| } |
| |
| |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{groupId}/variable-registry/update-requests/{updateId}") |
| @ApiOperation(value = "Gets a process group's variable registry", |
| response = VariableRegistryUpdateRequestEntity.class, |
| notes = NON_GUARANTEED_ENDPOINT, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response getVariableRegistryUpdateRequest( |
| @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId, |
| @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) { |
| |
| if (groupId == null || updateId == null) { |
| throw new IllegalArgumentException("Group ID and Update ID must both be specified."); |
| } |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, user); |
| }); |
| |
| final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId); |
| if (request == null) { |
| throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId); |
| } |
| |
| if (!groupId.equals(request.getProcessGroupId())) { |
| throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); |
| } |
| |
| if (!user.equals(request.getUser())) { |
| throw new IllegalArgumentException("Only the user that submitted the update request can retrieve it."); |
| } |
| |
| final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); |
| entity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(request)); |
| entity.setProcessGroupRevision(request.getProcessGroupRevision()); |
| entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); |
| return generateOkResponse(entity).build(); |
| } |
| |
| |
| @DELETE |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{groupId}/variable-registry/update-requests/{updateId}") |
| @ApiOperation(value = "Deletes an update request for a process group's variable registry. If the request is not yet complete, it will automatically be cancelled.", |
| response = VariableRegistryUpdateRequestEntity.class, |
| notes = NON_GUARANTEED_ENDPOINT, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response deleteVariableRegistryUpdateRequest( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("groupId") final String groupId, |
| @ApiParam( |
| value = "The ID of the Variable Registry Update Request", |
| required = true) |
| @PathParam("updateId") final String updateId, |
| @ApiParam( |
| value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", |
| required = false |
| ) |
| @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged) { |
| |
| if (groupId == null || updateId == null) { |
| throw new IllegalArgumentException("Group ID and Update ID must both be specified."); |
| } |
| |
| if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(disconnectedNodeAcknowledged); |
| } |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, user); |
| processGroup.authorize(authorizer, RequestAction.WRITE, user); |
| }); |
| |
| final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId); |
| if (request == null) { |
| throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId); |
| } |
| |
| if (!groupId.equals(request.getProcessGroupId())) { |
| throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); |
| } |
| |
| if (!user.equals(request.getUser())) { |
| throw new IllegalArgumentException("Only the user that submitted the update request can remove it."); |
| } |
| |
| request.cancel(); |
| |
| final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); |
| entity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(request)); |
| entity.setProcessGroupRevision(request.getProcessGroupRevision()); |
| entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); |
| return generateOkResponse(entity).build(); |
| } |
| |
| |
| @PUT |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/variable-registry") |
| @ApiOperation(value = "Updates the contents of a Process Group's variable Registry", response = VariableRegistryEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response updateVariableRegistry( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, |
| @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) { |
| |
| if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) { |
| throw new IllegalArgumentException("Variable Registry details must be specified."); |
| } |
| |
| if (requestVariableRegistryEntity.getProcessGroupRevision() == null) { |
| throw new IllegalArgumentException("Process Group Revision must be specified."); |
| } |
| |
| // ensure the same id is being used |
| final VariableRegistryDTO requestRegistryDto = requestVariableRegistryEntity.getVariableRegistry(); |
| if (!groupId.equals(requestRegistryDto.getProcessGroupId())) { |
| throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " |
| + "not equal the process group id of the requested resource (%s).", requestRegistryDto.getProcessGroupId(), groupId)); |
| } |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.PUT, requestVariableRegistryEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestVariableRegistryEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| // handle expects request (usually from the cluster manager) |
| final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId); |
| return withWriteLock( |
| serviceFacade, |
| requestVariableRegistryEntity, |
| requestRevision, |
| lookup -> { |
| Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable(); |
| authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| null, |
| (revision, variableRegistryEntity) -> { |
| final VariableRegistryDTO variableRegistry = variableRegistryEntity.getVariableRegistry(); |
| |
| // update the process group |
| final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, variableRegistry); |
| return generateOkResponse(entity).build(); |
| }); |
| } |
| |
| |
| /** |
| * Updates the variable registry for the specified process group. |
| * |
| * @param httpServletRequest request |
| * @param groupId The id of the process group. |
| * @param requestVariableRegistryEntity the Variable Registry Entity |
| * @return A Variable Registry Entry. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/variable-registry/update-requests") |
| @ApiOperation(value = "Submits a request to update a process group's variable registry", |
| response = VariableRegistryUpdateRequestEntity.class, |
| notes = NON_GUARANTEED_ENDPOINT, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response submitUpdateVariableRegistryRequest( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, |
| @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) { |
| |
| if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) { |
| throw new IllegalArgumentException("Variable Registry details must be specified."); |
| } |
| |
| if (requestVariableRegistryEntity.getProcessGroupRevision() == null) { |
| throw new IllegalArgumentException("Process Group Revision must be specified."); |
| } |
| |
| if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestVariableRegistryEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| // In order to update variables in a variable registry, we have to perform the following steps: |
| // 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service). |
| // 1a. Determine ID's of components |
| // 1b. Determine Revision's of associated components |
| // 2. Stop All Active Affected Processors |
| // 3. Disable All Active Affected Controller Services |
| // 4. Update the Variables |
| // 5. Re-Enable all previously Active Affected Controller Services (services only, not dependent components) |
| // 6. Re-Enable all previously Active Processors that Depended on the Controller Services |
| |
| // Determine the affected components (and their associated revisions) |
| final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestVariableRegistryEntity.getVariableRegistry()); |
| final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry(); |
| if (computedRegistryDto == null) { |
| throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); |
| } |
| |
| final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry()); |
| final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry()); |
| |
| final Map<String, List<AffectedComponentDTO>> activeAffectedComponentsByType = activeAffectedComponents.stream() |
| .collect(Collectors.groupingBy(comp -> comp.getReferenceType())); |
| |
| final List<AffectedComponentDTO> activeAffectedProcessors = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); |
| final List<AffectedComponentDTO> activeAffectedServices = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| // define access authorize for execution below |
| final AuthorizeAccess authorizeAccess = lookup -> { |
| final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable(); |
| groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user); |
| |
| // For every component that is affected, the user must have READ permissions and WRITE permissions |
| // (because this action requires stopping the component). |
| if (activeAffectedProcessors != null) { |
| for (final AffectedComponentDTO activeAffectedComponent : activeAffectedProcessors) { |
| final Authorizable authorizable = lookup.getProcessor(activeAffectedComponent.getId()).getAuthorizable(); |
| authorizable.authorize(authorizer, RequestAction.READ, user); |
| authorizable.authorize(authorizer, RequestAction.WRITE, user); |
| } |
| } |
| |
| if (activeAffectedServices != null) { |
| for (final AffectedComponentDTO activeAffectedComponent : activeAffectedServices) { |
| final Authorizable authorizable = lookup.getControllerService(activeAffectedComponent.getId()).getAuthorizable(); |
| authorizable.authorize(authorizer, RequestAction.READ, user); |
| authorizable.authorize(authorizer, RequestAction.WRITE, user); |
| } |
| } |
| }; |
| |
| if (isReplicateRequest()) { |
| // authorize access |
| serviceFacade.authorizeAccess(authorizeAccess); |
| |
| // update the variable registry |
| final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, allAffectedComponents, user); |
| updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); |
| final URI originalUri = getAbsolutePath(); |
| |
| // Submit the task to be run in the background |
| final Runnable taskWrapper = () -> { |
| try { |
| // set the user authentication token |
| final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user)); |
| SecurityContextHolder.getContext().setAuthentication(authentication); |
| |
| updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestVariableRegistryEntity); |
| |
| // ensure the request is marked complete |
| updateRequest.setComplete(true); |
| } catch (final Exception e) { |
| logger.error("Failed to update variable registry", e); |
| |
| updateRequest.setComplete(true); |
| updateRequest.setFailureReason("An unexpected error has occurred: " + e); |
| } finally { |
| // clear the authentication token |
| SecurityContextHolder.getContext().setAuthentication(null); |
| } |
| }; |
| |
| variableRegistryThreadPool.submit(taskWrapper); |
| |
| final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity(); |
| responseEntity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest)); |
| responseEntity.setProcessGroupRevision(updateRequest.getProcessGroupRevision()); |
| responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId())); |
| |
| final URI location = URI.create(responseEntity.getRequest().getUri()); |
| return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); |
| } |
| |
| final UpdateVariableRegistryRequestWrapper requestWrapper = |
| new UpdateVariableRegistryRequestWrapper(allAffectedComponents, activeAffectedProcessors, activeAffectedServices, requestVariableRegistryEntity); |
| |
| final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId); |
| return withWriteLock( |
| serviceFacade, |
| requestWrapper, |
| requestRevision, |
| authorizeAccess, |
| null, |
| (revision, wrapper) -> |
| updateVariableRegistryLocal(groupId, wrapper.getAllAffectedComponents(), wrapper.getActiveAffectedProcessors(), |
| wrapper.getActiveAffectedServices(), user, revision, wrapper.getVariableRegistryEntity()) |
| ); |
| } |
| |
| private Pause createPause(final VariableRegistryUpdateRequest updateRequest) { |
| return new Pause() { |
| @Override |
| public boolean pause() { |
| if (updateRequest.isComplete()) { |
| return false; |
| } |
| |
| try { |
| Thread.sleep(500); |
| } catch (final InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| return false; |
| } |
| |
| return !updateRequest.isComplete(); |
| } |
| }; |
| } |
| |
| private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors, |
| final Collection<AffectedComponentDTO> affectedServices, final VariableRegistryUpdateRequest updateRequest, |
| final VariableRegistryEntity requestEntity) throws InterruptedException, IOException { |
| final Pause pause = createPause(updateRequest); |
| |
| // stop processors |
| if (affectedProcessors != null) { |
| logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Processors", groupId, affectedProcessors.size()); |
| scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep()); |
| } else { |
| logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId); |
| updateRequest.getStopProcessorsStep().setComplete(true); |
| } |
| |
| // disable controller services |
| if (affectedServices != null) { |
| logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Controller Services", groupId, affectedServices.size()); |
| activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep()); |
| } else { |
| logger.info("In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId); |
| updateRequest.getDisableServicesStep().setComplete(true); |
| } |
| |
| // apply updates |
| logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to apply updates to variable registry", groupId); |
| applyVariableRegistryUpdate(groupId, originalUri, updateRequest, requestEntity); |
| |
| // re-enable controller services |
| if (affectedServices != null) { |
| logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to re-enable {} affected services", groupId, affectedServices.size()); |
| activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep()); |
| } else { |
| logger.info("In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId); |
| updateRequest.getEnableServicesStep().setComplete(true); |
| } |
| |
| // restart processors |
| if (affectedProcessors != null) { |
| logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to restart {} affected processors", groupId, affectedProcessors.size()); |
| scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep()); |
| } else { |
| logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId); |
| updateRequest.getStartProcessorsStep().setComplete(true); |
| } |
| } |
| |
| /** |
| * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State. |
| * |
| * @param groupId the ID of the Process Group to poll |
| * @param processorIds the ID of all Processors whose state should be equal to the given desired state |
| * @param desiredState the desired state for all processors with the ID's given |
| * @param pause the Pause that can be used to wait between polling |
| * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state |
| */ |
| private boolean waitForProcessorStatus(final URI originalUri, final String groupId, final Set<String> processorIds, final ScheduledState desiredState, |
| final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException { |
| URI groupUri; |
| try { |
| groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), |
| originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment()); |
| } catch (URISyntaxException e) { |
| throw new RuntimeException(e); |
| } |
| |
| final Map<String, String> headers = new HashMap<>(); |
| final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>(); |
| |
| boolean continuePolling = true; |
| while (continuePolling) { |
| |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. |
| final NodeResponse clusterResponse; |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); |
| } else { |
| clusterResponse = getRequestReplicator().forwardToCoordinator( |
| getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); |
| } |
| |
| if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { |
| return false; |
| } |
| |
| final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class); |
| final Set<ProcessorEntity> processorEntities = processorsEntity.getProcessors(); |
| |
| if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) { |
| logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState); |
| return true; |
| } |
| |
| // Not all of the processors are in the desired state. Pause for a bit and poll again. |
| continuePolling = pause.pause(); |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State. |
| * |
| * @param groupId the ID of the Process Group to poll |
| * @param processorIds the ID of all Processors whose state should be equal to the given desired state |
| * @param desiredState the desired state for all processors with the ID's given |
| * @param pause the Pause that can be used to wait between polling |
| * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state |
| */ |
| private boolean waitForLocalProcessor(final String groupId, final Set<String> processorIds, final ScheduledState desiredState, |
| final VariableRegistryUpdateRequest updateRequest, final Pause pause) { |
| |
| boolean continuePolling = true; |
| while (continuePolling) { |
| final Set<ProcessorEntity> processorEntities = serviceFacade.getProcessors(groupId, true); |
| |
| if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) { |
| logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState); |
| return true; |
| } |
| |
| // Not all of the processors are in the desired state. Pause for a bit and poll again. |
| continuePolling = pause.pause(); |
| } |
| |
| return false; |
| } |
| |
| private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final VariableRegistryUpdateRequest updateRequest, |
| final Set<String> processorIds, final ScheduledState desiredState) { |
| |
| final String desiredStateName = desiredState.name(); |
| |
| // update the affected processors |
| processorEntities.stream() |
| .filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId())) |
| .forEach(entity -> { |
| final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId()); |
| affectedComponentEntity.setRevision(entity.getRevision()); |
| |
| // only consider update this component if the user had permissions to it |
| if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { |
| final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); |
| affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus()); |
| affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount()); |
| |
| if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { |
| affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); |
| } |
| } |
| }); |
| |
| final boolean allProcessorsMatch = processorEntities.stream() |
| .filter(entity -> processorIds.contains(entity.getId())) |
| .allMatch(entity -> { |
| final ProcessorStatusDTO status = entity.getStatus(); |
| |
| final String runStatus = status.getAggregateSnapshot().getRunStatus(); |
| final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus); |
| if (!stateMatches) { |
| return false; |
| } |
| |
| return desiredState != ScheduledState.STOPPED || status.getAggregateSnapshot().getActiveThreadCount() == 0; |
| }); |
| |
| return allProcessorsMatch; |
| } |
| |
| /** |
| * Updates the affected controller services in the specified updateRequest with the serviceEntities. |
| * |
| * @param serviceEntities service entities |
| * @param updateRequest update request |
| */ |
| private void updateAffectedControllerServices(final Set<ControllerServiceEntity> serviceEntities, final VariableRegistryUpdateRequest updateRequest) { |
| // update the affected components |
| serviceEntities.stream() |
| .filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId())) |
| .forEach(entity -> { |
| final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId()); |
| affectedComponentEntity.setRevision(entity.getRevision()); |
| |
| // only consider update this component if the user had permissions to it |
| if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { |
| final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); |
| affectedComponent.setState(entity.getComponent().getState()); |
| |
| if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { |
| affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. |
| * |
| * @param groupId the ID of the Process Group to poll |
| * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state |
| * @param desiredState the desired state for all services with the ID's given |
| * @param pause the Pause that can be used to wait between polling |
| * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state |
| */ |
| private boolean waitForControllerServiceStatus(final URI originalUri, final String groupId, final Set<String> serviceIds, |
| final ControllerServiceState desiredState, final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException { |
| |
| URI groupUri; |
| try { |
| groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), |
| originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false&includeDescendantGroups=true", originalUri.getFragment()); |
| } catch (URISyntaxException e) { |
| throw new RuntimeException(e); |
| } |
| |
| final Map<String, String> headers = new HashMap<>(); |
| final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>(); |
| |
| boolean continuePolling = true; |
| while (continuePolling) { |
| |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. |
| final NodeResponse clusterResponse; |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); |
| } else { |
| clusterResponse = getRequestReplicator().forwardToCoordinator( |
| getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); |
| } |
| |
| if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { |
| return false; |
| } |
| |
| final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class); |
| final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices(); |
| |
| // update the affected controller services |
| updateAffectedControllerServices(serviceEntities, updateRequest); |
| |
| final String desiredStateName = desiredState.name(); |
| final boolean allServicesMatch = serviceEntities.stream() |
| .map(entity -> entity.getComponent()) |
| .filter(service -> serviceIds.contains(service.getId())) |
| .map(service -> service.getState()) |
| .allMatch(state -> state.equals(desiredStateName)); |
| |
| if (allServicesMatch) { |
| logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState); |
| return true; |
| } |
| |
| // Not all of the processors are in the desired state. Pause for a bit and poll again. |
| continuePolling = pause.pause(); |
| } |
| |
| return false; |
| } |
| |
| |
| /** |
| * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. |
| * |
| * @param groupId the ID of the Process Group to poll |
| * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state |
| * @param desiredState the desired state for all services with the ID's given |
| * @param pause the Pause that can be used to wait between polling |
| * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state |
| */ |
| private boolean waitForLocalControllerServiceStatus(final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState, |
| final VariableRegistryUpdateRequest updateRequest, final Pause pause) { |
| |
| boolean continuePolling = true; |
| while (continuePolling) { |
| final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true); |
| |
| // update the affected controller services |
| updateAffectedControllerServices(serviceEntities, updateRequest); |
| |
| final String desiredStateName = desiredState.name(); |
| final boolean allServicesMatch = serviceEntities.stream() |
| .map(entity -> entity.getComponent()) |
| .filter(service -> serviceIds.contains(service.getId())) |
| .map(service -> service.getState()) |
| .allMatch(state -> desiredStateName.equals(state)); |
| |
| |
| if (allServicesMatch) { |
| logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState); |
| return true; |
| } |
| |
| // Not all of the processors are in the desired state. Pause for a bit and poll again. |
| continuePolling = pause.pause(); |
| } |
| |
| return false; |
| } |
| |
| private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user) { |
| final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId, affectedComponents, user); |
| |
| // before adding to the request map, purge any old requests. Must do this by creating a List of ID's |
| // and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException. |
| final Date oneMinuteAgo = new Date(System.currentTimeMillis() - VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION); |
| final List<String> completedRequestIds = varRegistryUpdateRequests.entrySet().stream() |
| .filter(entry -> entry.getValue().isComplete()) |
| .filter(entry -> entry.getValue().getLastUpdated().before(oneMinuteAgo)) |
| .map(Map.Entry::getKey) |
| .collect(Collectors.toList()); |
| |
| completedRequestIds.forEach(varRegistryUpdateRequests::remove); |
| |
| final int requestCount = varRegistryUpdateRequests.size(); |
| if (requestCount > MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS) { |
| throw new IllegalStateException("There are already " + requestCount + " update requests for variable registries. " |
| + "Cannot issue any more requests until the older ones are deleted or expire"); |
| } |
| |
| this.varRegistryUpdateRequests.put(updateRequest.getRequestId(), updateRequest); |
| return updateRequest; |
| } |
| |
| private Response updateVariableRegistryLocal(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final List<AffectedComponentDTO> affectedProcessors, |
| final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final Revision requestRevision, final VariableRegistryEntity requestEntity) { |
| |
| final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream() |
| .map(component -> component.getId()) |
| .collect(Collectors.toSet()); |
| Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds); |
| |
| final Set<String> affectedServiceIds = affectedServices == null ? Collections.emptySet() : affectedServices.stream() |
| .map(component -> component.getId()) |
| .collect(Collectors.toSet()); |
| Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds); |
| |
| // update the variable registry |
| final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, affectedComponents, user); |
| updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); |
| final Pause pause = createPause(updateRequest); |
| |
| final Runnable updateTask = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| // set the user authentication token |
| final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user)); |
| SecurityContextHolder.getContext().setAuthentication(authentication); |
| |
| // Stop processors |
| performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStopProcessorsStep(), "Stopping Processors", |
| () -> stopProcessors(updateRequest, groupId, processorRevisionMap, pause)); |
| |
| // Update revision map because this will have modified the revisions of our components. |
| final Map<String, Revision> updatedProcessorRevisionMap = getRevisions(groupId, affectedProcessorIds); |
| |
| // Disable controller services |
| performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getDisableServicesStep(), "Disabling Controller Services", |
| () -> disableControllerServices(updateRequest, groupId, serviceRevisionMap, pause)); |
| |
| // Update revision map because this will have modified the revisions of our components. |
| final Map<String, Revision> updatedServiceRevisionMap = getRevisions(groupId, affectedServiceIds); |
| |
| // Apply the updates |
| performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to Variable Registry", |
| () -> { |
| final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(requestRevision, requestEntity.getVariableRegistry()); |
| updateRequest.setProcessGroupRevision(entity.getProcessGroupRevision()); |
| }); |
| |
| // Re-enable the controller services |
| performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller Services", |
| () -> enableControllerServices(updateRequest, groupId, updatedServiceRevisionMap, pause)); |
| |
| // Restart processors |
| performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors", |
| () -> startProcessors(updateRequest, groupId, updatedProcessorRevisionMap, pause)); |
| |
| // Set complete |
| updateRequest.setComplete(true); |
| updateRequest.setLastUpdated(new Date()); |
| } catch (final Exception e) { |
| logger.error("Failed to update Variable Registry for Proces Group with ID " + groupId, e); |
| |
| updateRequest.setComplete(true); |
| updateRequest.setFailureReason("An unexpected error has occurred: " + e); |
| } finally { |
| // clear the authentication token |
| SecurityContextHolder.getContext().setAuthentication(null); |
| } |
| } |
| }; |
| |
| // Submit the task to be run in the background |
| variableRegistryThreadPool.submit(updateTask); |
| |
| final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity(); |
| responseEntity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest)); |
| responseEntity.setProcessGroupRevision(updateRequest.getProcessGroupRevision()); |
| responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId())); |
| |
| final URI location = URI.create(responseEntity.getRequest().getUri()); |
| return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); |
| } |
| |
| private Map<String, Revision> getRevisions(final String groupId, final Set<String> componentIds) { |
| final Set<Revision> processorRevisions = serviceFacade.getRevisionsFromGroup(groupId, group -> componentIds); |
| return processorRevisions.stream().collect(Collectors.toMap(revision -> revision.getComponentId(), Function.identity())); |
| } |
| |
| private void performUpdateVariableRegistryStep(final String groupId, final VariableRegistryUpdateRequest request, final VariableRegistryUpdateStep step, |
| final String stepDescription, final Runnable action) { |
| |
| if (request.isComplete()) { |
| logger.info("In updating Variable Registry for Process Group with ID {}" |
| + ", skipping the following step because the request has completed already: {}", groupId, stepDescription); |
| return; |
| } |
| |
| try { |
| logger.info("In order to update Variable Registry for Process Group with ID {}, {}", groupId, stepDescription); |
| |
| action.run(); |
| step.setComplete(true); |
| } catch (final Exception e) { |
| logger.error("Failed to update variable registry for Process Group with ID {}", groupId, e); |
| |
| step.setComplete(true); |
| step.setFailureReason(e.getMessage()); |
| |
| request.setComplete(true); |
| request.setFailureReason("Failed to update Variable Registry because failed while performing step: " + stepDescription); |
| } |
| |
| request.setLastUpdated(new Date()); |
| } |
| |
| private void stopProcessors(final VariableRegistryUpdateRequest updateRequest, final String processGroupId, |
| final Map<String, Revision> processorRevisions, final Pause pause) { |
| |
| if (processorRevisions.isEmpty()) { |
| return; |
| } |
| |
| serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions.keySet()); |
| serviceFacade.scheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions); |
| waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, updateRequest, pause); |
| } |
| |
| private void startProcessors(final VariableRegistryUpdateRequest request, final String processGroupId, final Map<String, Revision> processorRevisions, final Pause pause) { |
| if (processorRevisions.isEmpty()) { |
| return; |
| } |
| |
| serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions.keySet()); |
| serviceFacade.scheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions); |
| waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, request, pause); |
| } |
| |
| private void disableControllerServices(final VariableRegistryUpdateRequest updateRequest, final String processGroupId, |
| final Map<String, Revision> serviceRevisions, final Pause pause) { |
| |
| if (serviceRevisions.isEmpty()) { |
| return; |
| } |
| |
| serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions.keySet()); |
| serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions); |
| waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, updateRequest, pause); |
| } |
| |
| private void enableControllerServices(final VariableRegistryUpdateRequest updateRequest, final String processGroupId, |
| final Map<String, Revision> serviceRevisions, final Pause pause) { |
| |
| if (serviceRevisions.isEmpty()) { |
| return; |
| } |
| |
| serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions.keySet()); |
| serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions); |
| waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, updateRequest, pause); |
| } |
| |
| |
| private void scheduleProcessors(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest, |
| final Pause pause, final Collection<AffectedComponentDTO> affectedProcessors, final ScheduledState desiredState, |
| final VariableRegistryUpdateStep updateStep) throws InterruptedException { |
| |
| final Set<String> affectedProcessorIds = affectedProcessors.stream() |
| .map(component -> component.getId()) |
| .collect(Collectors.toSet()); |
| |
| final Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds); |
| final Map<String, RevisionDTO> processorRevisionDtoMap = processorRevisionMap.entrySet().stream().collect( |
| Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue()))); |
| |
| final ScheduleComponentsEntity scheduleProcessorsEntity = new ScheduleComponentsEntity(); |
| scheduleProcessorsEntity.setComponents(processorRevisionDtoMap); |
| scheduleProcessorsEntity.setId(groupId); |
| scheduleProcessorsEntity.setState(desiredState.name()); |
| |
| URI scheduleGroupUri; |
| try { |
| scheduleGroupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), |
| originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, originalUri.getFragment()); |
| } catch (URISyntaxException e) { |
| throw new RuntimeException(e); |
| } |
| |
| final Map<String, String> headers = new HashMap<>(); |
| headers.put("content-type", MediaType.APPLICATION_JSON); |
| |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. |
| final NodeResponse clusterResponse; |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse(); |
| } else { |
| clusterResponse = getRequestReplicator().forwardToCoordinator( |
| getClusterCoordinatorNode(), HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse(); |
| } |
| |
| final int stopProcessorStatus = clusterResponse.getStatus(); |
| if (stopProcessorStatus != Status.OK.getStatusCode()) { |
| updateRequest.getStopProcessorsStep().setFailureReason("Failed while " + updateStep.getDescription()); |
| |
| updateStep.setComplete(true); |
| updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); |
| return; |
| } |
| |
| updateRequest.setLastUpdated(new Date()); |
| final boolean processorsTransitioned = waitForProcessorStatus(originalUri, groupId, affectedProcessorIds, desiredState, updateRequest, pause); |
| updateStep.setComplete(true); |
| |
| if (!processorsTransitioned) { |
| updateStep.setFailureReason("Failed while " + updateStep.getDescription()); |
| |
| updateRequest.setComplete(true); |
| updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); |
| } |
| } |
| |
| private void activateControllerServices(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest, |
| final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) |
| throws InterruptedException { |
| |
| final Set<String> affectedServiceIds = affectedServices.stream() |
| .map(component -> component.getId()) |
| .collect(Collectors.toSet()); |
| |
| final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds); |
| final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect( |
| Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue()))); |
| |
| final ActivateControllerServicesEntity activateServicesEntity = new ActivateControllerServicesEntity(); |
| activateServicesEntity.setComponents(serviceRevisionDtoMap); |
| activateServicesEntity.setId(groupId); |
| activateServicesEntity.setState(desiredState.name()); |
| |
| URI controllerServicesUri; |
| try { |
| controllerServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), |
| originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment()); |
| } catch (URISyntaxException e) { |
| throw new RuntimeException(e); |
| } |
| |
| final Map<String, String> headers = new HashMap<>(); |
| headers.put("content-type", MediaType.APPLICATION_JSON); |
| |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. |
| final NodeResponse clusterResponse; |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse(); |
| } else { |
| clusterResponse = getRequestReplicator().forwardToCoordinator( |
| getClusterCoordinatorNode(), HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse(); |
| } |
| |
| final int disableServicesStatus = clusterResponse.getStatus(); |
| if (disableServicesStatus != Status.OK.getStatusCode()) { |
| updateStep.setFailureReason("Failed while " + updateStep.getDescription()); |
| |
| updateStep.setComplete(true); |
| updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); |
| return; |
| } |
| |
| updateRequest.setLastUpdated(new Date()); |
| final boolean serviceTransitioned = waitForControllerServiceStatus(originalUri, groupId, affectedServiceIds, desiredState, updateRequest, pause); |
| updateStep.setComplete(true); |
| |
| if (!serviceTransitioned) { |
| updateStep.setFailureReason("Failed while " + updateStep.getDescription()); |
| |
| updateRequest.setComplete(true); |
| updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); |
| } |
| } |
| |
| private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest, |
| final VariableRegistryEntity updateEntity) throws InterruptedException, IOException { |
| |
| // convert request accordingly |
| URI applyUpdatesUri; |
| try { |
| applyUpdatesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), |
| originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/variable-registry", null, originalUri.getFragment()); |
| } catch (URISyntaxException e) { |
| throw new RuntimeException(e); |
| } |
| |
| final Map<String, String> headers = new HashMap<>(); |
| headers.put("content-type", MediaType.APPLICATION_JSON); |
| |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. |
| final NodeResponse clusterResponse; |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse(); |
| } else { |
| clusterResponse = getRequestReplicator().forwardToCoordinator( |
| getClusterCoordinatorNode(), HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse(); |
| } |
| |
| final int applyUpdatesStatus = clusterResponse.getStatus(); |
| updateRequest.setLastUpdated(new Date()); |
| updateRequest.getApplyUpdatesStep().setComplete(true); |
| |
| if (applyUpdatesStatus == Status.OK.getStatusCode()) { |
| // grab the current process group revision |
| final VariableRegistryEntity entity = getResponseEntity(clusterResponse, VariableRegistryEntity.class); |
| updateRequest.setProcessGroupRevision(entity.getProcessGroupRevision()); |
| } else { |
| final String message = getResponseEntity(clusterResponse, String.class); |
| |
| // update the request progress |
| updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry: " + message); |
| updateRequest.setComplete(true); |
| updateRequest.setFailureReason("Failed to apply updates to the Variable Registry: " + message); |
| } |
| } |
| |
| /** |
| * Extracts the response entity from the specified node response. |
| * |
| * @param nodeResponse node response |
| * @param clazz class |
| * @param <T> type of class |
| * @return the response entity |
| */ |
| @SuppressWarnings("unchecked") |
| protected <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) { |
| T entity = (T) nodeResponse.getUpdatedEntity(); |
| if (entity == null) { |
| entity = nodeResponse.getClientResponse().readEntity(clazz); |
| } |
| return entity; |
| } |
| |
| /** |
| * Creates a request to drop the flowfiles from all connection queues within a process group (recursively). |
| * |
| * @param httpServletRequest request |
| * @param processGroupId The id of the process group to be removed. |
| * @return A dropRequestEntity. |
| */ |
| @POST |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/empty-all-connections-requests") |
| @ApiOperation( |
| value = "Creates a request to drop all flowfiles of all connection queues in this process group.", |
| response = ProcessGroupEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"), |
| @Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 202, message = "The request has been accepted. An HTTP response header will contain the URI where the status can be polled."), |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createEmptyAllConnectionsRequest( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String processGroupId |
| ) { |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST); |
| } |
| |
| final ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity(); |
| requestProcessGroupEntity.setId(processGroupId); |
| |
| return withWriteLock( |
| serviceFacade, |
| requestProcessGroupEntity, |
| lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup), |
| null, |
| (processGroupEntity) -> { |
| // ensure the id is the same across the cluster |
| final String dropRequestId = generateUuid(); |
| |
| // submit the drop request |
| final DropRequestDTO dropRequest = serviceFacade.createDropAllFlowFilesInProcessGroup(processGroupEntity.getId(), dropRequestId); |
| dropRequest.setUri(generateResourceUri("process-groups", processGroupEntity.getId(), "empty-all-connections-requests", dropRequest.getId())); |
| |
| // create the response entity |
| final DropRequestEntity entity = new DropRequestEntity(); |
| entity.setDropRequest(dropRequest); |
| |
| // generate the URI where the response will be |
| final URI location = URI.create(dropRequest.getUri()); |
| return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Checks the status of an outstanding request for dropping all flowfiles within a process group. |
| * |
| * @param processGroupId The id of the process group |
| * @param dropRequestId The id of the drop request |
| * @return A dropRequestEntity |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/empty-all-connections-requests/{drop-request-id}") |
| @ApiOperation( |
| value = "Gets the current status of a drop all flowfiles request.", |
| response = DropRequestEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"), |
| @Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getDropAllFlowfilesRequest( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String processGroupId, |
| @ApiParam( |
| value = "The drop request id.", |
| required = true |
| ) |
| @PathParam("drop-request-id") final String dropRequestId |
| ) { |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup)); |
| |
| // get the drop request |
| final DropRequestDTO dropRequest = serviceFacade.getDropAllFlowFilesRequest(processGroupId, dropRequestId); |
| dropRequest.setUri(generateResourceUri("process-groups", processGroupId, "empty-all-connections-requests", dropRequest.getId())); |
| |
| // create the response entity |
| final DropRequestEntity entity = new DropRequestEntity(); |
| entity.setDropRequest(dropRequest); |
| |
| return generateOkResponse(entity).build(); |
| } |
| |
| /** |
| * Cancels the specified request for dropping all flowfiles within a process group. |
| * |
| * @param httpServletRequest request |
| * @param processGroupId The process group id |
| * @param dropRequestId The drop request id |
| * @return A dropRequestEntity |
| */ |
| @DELETE |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/empty-all-connections-requests/{drop-request-id}") |
| @ApiOperation( |
| value = "Cancels and/or removes a request to drop all flowfiles.", |
| response = DropRequestEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"), |
| @Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response removeDropRequest( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String processGroupId, |
| @ApiParam( |
| value = "The drop request id.", |
| required = true |
| ) |
| @PathParam("drop-request-id") final String dropRequestId |
| ) { |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.DELETE); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| new DropEntity(processGroupId, dropRequestId), |
| lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup), |
| null, |
| (dropEntity) -> { |
| // delete the drop request |
| final DropRequestDTO dropRequest = serviceFacade.deleteDropAllFlowFilesRequest(dropEntity.getEntityId(), dropEntity.getDropRequestId()); |
| dropRequest.setUri(generateResourceUri("process-groups", dropEntity.getEntityId(), "empty-all-connections-requests", dropRequest.getId())); |
| |
| // create the response entity |
| final DropRequestEntity entity = new DropRequestEntity(); |
| entity.setDropRequest(dropRequest); |
| |
| return generateOkResponse(entity).build(); |
| } |
| ); |
| } |
| |
| private void authorizeHandleDropAllFlowFilesRequest(String processGroupId, AuthorizableLookup lookup) { |
| final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId); |
| |
| authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false); |
| |
| processGroup.getEncapsulatedProcessGroups() |
| .forEach(encapsulatedProcessGroup -> authorizeProcessGroup(encapsulatedProcessGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false)); |
| |
| processGroup.getEncapsulatedConnections().stream() |
| .map(ConnectionAuthorizable::getSourceData) |
| .forEach(connectionSourceData -> connectionSourceData.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())); |
| } |
| |
| /** |
| * Removes the specified process group reference. |
| * |
| * @param httpServletRequest request |
| * @param version The revision is used to verify the client is working with the latest version of the flow. |
| * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. |
| * @param id The id of the process group to be removed. |
| * @return A processGroupEntity. |
| */ |
| @DELETE |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}") |
| @ApiOperation( |
| value = "Deletes a process group", |
| response = ProcessGroupEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}"), |
| @Authorization(value = "Write - Parent Process Group - /process-groups/{uuid}"), |
| @Authorization(value = "Read - any referenced Controller Services by any encapsulated components - /controller-services/{uuid}"), |
| @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response removeProcessGroup( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The revision is used to verify the client is working with the latest version of the flow.", |
| required = false |
| ) |
| @QueryParam(VERSION) final LongParameter version, |
| @ApiParam( |
| value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", |
| required = false |
| ) |
| @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, |
| @ApiParam( |
| value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", |
| required = false |
| ) |
| @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String id) { |
| |
| // replicate if cluster manager |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.DELETE); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(disconnectedNodeAcknowledged); |
| } |
| |
| final ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity(); |
| requestProcessGroupEntity.setId(id); |
| |
| // handle expects request (usually from the cluster manager) |
| final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); |
| return withWriteLock( |
| serviceFacade, |
| requestProcessGroupEntity, |
| requestRevision, |
| lookup -> { |
| final ProcessGroupAuthorizable processGroupAuthorizable = lookup.getProcessGroup(id); |
| |
| // ensure write to this process group and all encapsulated components including templates and controller services. additionally, ensure |
| // read to any referenced services by encapsulated components |
| authorizeProcessGroup(processGroupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, true, true, false, false); |
| |
| // ensure write permission to the parent process group, if applicable... if this is the root group the |
| // request will fail later but still need to handle authorization here |
| final Authorizable parentAuthorizable = processGroupAuthorizable.getAuthorizable().getParentAuthorizable(); |
| if (parentAuthorizable != null) { |
| parentAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| } |
| }, |
| () -> serviceFacade.verifyDeleteProcessGroup(id), |
| (revision, processGroupEntity) -> { |
| // delete the process group |
| final ProcessGroupEntity entity = serviceFacade.deleteProcessGroup(revision, processGroupEntity.getId()); |
| |
| // prune response as necessary |
| if (entity.getComponent() != null) { |
| entity.getComponent().setContents(null); |
| } |
| |
| // create the response |
| return generateOkResponse(entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Adds the specified process group. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestProcessGroupEntity A processGroupEntity |
| * @return A processGroupEntity |
| * @throws IOException if the request indicates that the Process Group should be imported from a Flow Registry and NiFi is unable to communicate with the Flow Registry |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/process-groups") |
| @ApiOperation( |
| value = "Creates a process group", |
| response = ProcessGroupEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createProcessGroup( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The process group configuration details.", |
| required = true |
| ) final ProcessGroupEntity requestProcessGroupEntity) { |
| |
| if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Process group details must be specified."); |
| } |
| |
| if (requestProcessGroupEntity.getRevision() == null || (requestProcessGroupEntity.getRevision().getVersion() == null || requestProcessGroupEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Process group."); |
| } |
| |
| if (requestProcessGroupEntity.getComponent().getId() != null) { |
| throw new IllegalArgumentException("Process group ID cannot be specified."); |
| } |
| |
| final PositionDTO proposedPosition = requestProcessGroupEntity.getComponent().getPosition(); |
| if (proposedPosition != null) { |
| if (proposedPosition.getX() == null || proposedPosition.getY() == null) { |
| throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified."); |
| } |
| } |
| |
| // if the group name isn't specified, ensure the group is being imported from version control |
| if (StringUtils.isBlank(requestProcessGroupEntity.getComponent().getName()) && requestProcessGroupEntity.getComponent().getVersionControlInformation() == null) { |
| throw new IllegalArgumentException("The group name is required when the group is not imported from version control."); |
| } |
| |
| if (requestProcessGroupEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestProcessGroupEntity.getComponent().getParentGroupId())) { |
| throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", |
| requestProcessGroupEntity.getComponent().getParentGroupId(), groupId)); |
| } |
| requestProcessGroupEntity.getComponent().setParentGroupId(groupId); |
| |
| // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. |
| // Step 2: Retrieve flow from Flow Registry |
| // Step 3: Resolve Bundle info |
| // Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added. |
| // Step 5: If any of the components is a Restricted Component, then we must authorize the user |
| // for write access to the RestrictedComponents resource |
| // Step 6: Replicate the request or call serviceFacade.updateProcessGroup |
| |
| final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation(); |
| if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) { |
| // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. |
| // Step 2: Retrieve flow from Flow Registry |
| final VersionedFlowSnapshot flowSnapshot = getFlowFromRegistry(versionControlInfo); |
| |
| // Step 3: Resolve Bundle info |
| serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents()); |
| |
| // If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to. |
| serviceFacade.resolveInheritedControllerServices(flowSnapshot, groupId, NiFiUserUtils.getNiFiUser()); |
| |
| // Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added. |
| requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot); |
| } |
| |
| if (versionControlInfo != null) { |
| final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); |
| serviceFacade.verifyImportProcessGroup(versionControlInfo, flowSnapshot.getFlowContents(), groupId); |
| } |
| |
| // Step 6: Replicate the request or call serviceFacade.updateProcessGroup |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestProcessGroupEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestProcessGroupEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestProcessGroupEntity, |
| lookup -> authorizeAccess(groupId, requestProcessGroupEntity, lookup), |
| () -> { |
| final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); |
| if (versionedFlowSnapshot != null) { |
| serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents()); |
| } |
| }, |
| processGroupEntity -> { |
| final ProcessGroupDTO processGroup = processGroupEntity.getComponent(); |
| |
| // set the processor id as appropriate |
| processGroup.setId(generateUuid()); |
| |
| // ensure the group name comes from the versioned flow |
| final VersionedFlowSnapshot flowSnapshot = processGroupEntity.getVersionedFlowSnapshot(); |
| if (flowSnapshot != null && StringUtils.isNotBlank(flowSnapshot.getFlowContents().getName()) && StringUtils.isBlank(processGroup.getName())) { |
| processGroup.setName(flowSnapshot.getFlowContents().getName()); |
| } |
| |
| // create the process group contents |
| final Revision revision = getRevision(processGroupEntity, processGroup.getId()); |
| ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroup); |
| |
| if (flowSnapshot != null) { |
| final RevisionDTO revisionDto = entity.getRevision(); |
| final String newGroupId = entity.getComponent().getId(); |
| final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId); |
| |
| // We don't want the Process Group's position to be updated because we want to keep the position where the user |
| // placed the Process Group. However, we do want to use the name of the Process Group that is in the Flow Contents. |
| // To accomplish this, we call updateProcessGroupContents() passing 'true' for the updateSettings flag but null out the position. |
| flowSnapshot.getFlowContents().setPosition(null); |
| entity = serviceFacade.updateProcessGroupContents(newGroupRevision, newGroupId, versionControlInfo, flowSnapshot, |
| getIdGenerationSeed().orElse(null), false, true, true); |
| } |
| |
| populateRemainingProcessGroupEntityContent(entity); |
| |
| // generate a 201 created response |
| String uri = entity.getUri(); |
| return generateCreatedResponse(URI.create(uri), entity).build(); |
| } |
| ); |
| } |
| |
| private VersionedFlowSnapshot getFlowFromRegistry(final VersionControlInformationDTO versionControlInfo) { |
| final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true); |
| final Bucket bucket = flowSnapshot.getBucket(); |
| final VersionedFlow flow = flowSnapshot.getFlow(); |
| |
| versionControlInfo.setBucketName(bucket.getName()); |
| versionControlInfo.setFlowName(flow.getName()); |
| versionControlInfo.setFlowDescription(flow.getDescription()); |
| |
| versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId())); |
| final VersionedFlowState flowState = flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE; |
| versionControlInfo.setState(flowState.name()); |
| |
| return flowSnapshot; |
| } |
| |
| |
| /** |
| * Retrieves all the child process groups of the process group with the given id. |
| * |
| * @param groupId the parent process group id |
| * @return An entity containing all the child process group entities. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/process-groups") |
| @ApiOperation( |
| value = "Gets all process groups", |
| response = ProcessGroupsEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getProcessGroups( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // get the process groups |
| final Set<ProcessGroupEntity> entities = serviceFacade.getProcessGroups(groupId); |
| |
| // always prune the contents |
| for (final ProcessGroupEntity entity : entities) { |
| if (entity.getComponent() != null) { |
| entity.getComponent().setContents(null); |
| } |
| } |
| |
| // create the response entity |
| final ProcessGroupsEntity entity = new ProcessGroupsEntity(); |
| entity.setProcessGroups(populateRemainingProcessGroupEntitiesContent(entities)); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| // ---------- |
| // processors |
| // ---------- |
| |
| /** |
| * Creates a new processor. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestProcessorEntity A processorEntity. |
| * @return A processorEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/processors") |
| @ApiOperation( |
| value = "Creates a new processor", |
| response = ProcessorEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}"), |
| @Authorization(value = "Read - any referenced Controller Services - /controller-services/{uuid}"), |
| @Authorization(value = "Write - if the Processor is restricted - /restricted-components") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createProcessor( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The processor configuration details.", |
| required = true |
| ) final ProcessorEntity requestProcessorEntity) { |
| |
| if (requestProcessorEntity == null || requestProcessorEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Processor details must be specified."); |
| } |
| |
| if (requestProcessorEntity.getRevision() == null || (requestProcessorEntity.getRevision().getVersion() == null || requestProcessorEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Processor."); |
| } |
| |
| final ProcessorDTO requestProcessor = requestProcessorEntity.getComponent(); |
| if (requestProcessor.getId() != null) { |
| throw new IllegalArgumentException("Processor ID cannot be specified."); |
| } |
| |
| if (StringUtils.isBlank(requestProcessor.getType())) { |
| throw new IllegalArgumentException("The type of processor to create must be specified."); |
| } |
| |
| final PositionDTO proposedPosition = requestProcessor.getPosition(); |
| if (proposedPosition != null) { |
| if (proposedPosition.getX() == null || proposedPosition.getY() == null) { |
| throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified."); |
| } |
| } |
| |
| if (requestProcessor.getParentGroupId() != null && !groupId.equals(requestProcessor.getParentGroupId())) { |
| throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", |
| requestProcessor.getParentGroupId(), groupId)); |
| } |
| requestProcessor.setParentGroupId(groupId); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestProcessorEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestProcessorEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestProcessorEntity, |
| lookup -> { |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); |
| final Authorizable processGroup = groupAuthorizable.getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, user); |
| |
| final Authorizable parameterContext = groupAuthorizable.getProcessGroup().getParameterContext(); |
| final ProcessorConfigDTO configDto = requestProcessor.getConfig(); |
| if (parameterContext != null && configDto != null) { |
| AuthorizeParameterReference.authorizeParameterReferences(configDto.getProperties(), authorizer, parameterContext, user); |
| } |
| |
| ComponentAuthorizable authorizable = null; |
| try { |
| authorizable = lookup.getConfigurableComponent(requestProcessor.getType(), requestProcessor.getBundle()); |
| |
| if (authorizable.isRestricted()) { |
| authorizeRestrictions(authorizer, authorizable); |
| } |
| |
| final ProcessorConfigDTO config = requestProcessor.getConfig(); |
| if (config != null && config.getProperties() != null) { |
| AuthorizeControllerServiceReference.authorizeControllerServiceReferences(config.getProperties(), authorizable, authorizer, lookup); |
| } |
| } finally { |
| if (authorizable != null) { |
| authorizable.cleanUpResources(); |
| } |
| } |
| }, |
| () -> serviceFacade.verifyCreateProcessor(requestProcessor), |
| processorEntity -> { |
| final ProcessorDTO processor = processorEntity.getComponent(); |
| |
| // set the processor id as appropriate |
| processor.setId(generateUuid()); |
| |
| // create the new processor |
| final Revision revision = getRevision(processorEntity, processor.getId()); |
| final ProcessorEntity entity = serviceFacade.createProcessor(revision, groupId, processor); |
| processorResource.populateRemainingProcessorEntityContent(entity); |
| |
| // generate a 201 created response |
| String uri = entity.getUri(); |
| return generateCreatedResponse(URI.create(uri), entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Retrieves all the processors in this NiFi. |
| * |
| * @param groupId group id |
| * @return A processorsEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/processors") |
| @ApiOperation( |
| value = "Gets all processors", |
| response = ProcessorsEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getProcessors( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam("Whether or not to include processors from descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // get the processors |
| final Set<ProcessorEntity> processors = serviceFacade.getProcessors(groupId, includeDescendantGroups); |
| |
| // create the response entity |
| final ProcessorsEntity entity = new ProcessorsEntity(); |
| entity.setProcessors(processorResource.populateRemainingProcessorEntitiesContent(processors)); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| // ----------- |
| // input ports |
| // ----------- |
| |
| /** |
| * Creates a new input port. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestPortEntity A inputPortEntity. |
| * @return A inputPortEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/input-ports") |
| @ApiOperation( |
| value = "Creates an input port", |
| response = PortEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createInputPort( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The input port configuration details.", |
| required = true |
| ) final PortEntity requestPortEntity) { |
| |
| if (requestPortEntity == null || requestPortEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Port details must be specified."); |
| } |
| |
| if (requestPortEntity.getRevision() == null || (requestPortEntity.getRevision().getVersion() == null || requestPortEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Input port."); |
| } |
| |
| if (requestPortEntity.getComponent().getId() != null) { |
| throw new IllegalArgumentException("Input port ID cannot be specified."); |
| } |
| |
| final PositionDTO proposedPosition = requestPortEntity.getComponent().getPosition(); |
| if (proposedPosition != null) { |
| if (proposedPosition.getX() == null || proposedPosition.getY() == null) { |
| throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified."); |
| } |
| } |
| |
| if (requestPortEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestPortEntity.getComponent().getParentGroupId())) { |
| throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", |
| requestPortEntity.getComponent().getParentGroupId(), groupId)); |
| } |
| requestPortEntity.getComponent().setParentGroupId(groupId); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestPortEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestPortEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestPortEntity, |
| lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| null, |
| portEntity -> { |
| // set the processor id as appropriate |
| portEntity.getComponent().setId(generateUuid()); |
| |
| // create the input port and generate the json |
| final Revision revision = getRevision(portEntity, portEntity.getComponent().getId()); |
| final PortEntity entity = serviceFacade.createInputPort(revision, groupId, portEntity.getComponent()); |
| inputPortResource.populateRemainingInputPortEntityContent(entity); |
| |
| // build the response |
| return generateCreatedResponse(URI.create(entity.getUri()), entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Retrieves all the of input ports in this NiFi. |
| * |
| * @return A inputPortsEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/input-ports") |
| @ApiOperation( |
| value = "Gets all input ports", |
| response = InputPortsEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getInputPorts( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // get all the input ports |
| final Set<PortEntity> inputPorts = serviceFacade.getInputPorts(groupId); |
| |
| final InputPortsEntity entity = new InputPortsEntity(); |
| entity.setInputPorts(inputPortResource.populateRemainingInputPortEntitiesContent(inputPorts)); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| // ------------ |
| // output ports |
| // ------------ |
| |
| /** |
| * Creates a new output port. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestPortEntity A outputPortEntity. |
| * @return A outputPortEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/output-ports") |
| @ApiOperation( |
| value = "Creates an output port", |
| response = PortEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createOutputPort( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The output port configuration.", |
| required = true |
| ) final PortEntity requestPortEntity) { |
| |
| if (requestPortEntity == null || requestPortEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Port details must be specified."); |
| } |
| |
| if (requestPortEntity.getRevision() == null || (requestPortEntity.getRevision().getVersion() == null || requestPortEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Output port."); |
| } |
| |
| if (requestPortEntity.getComponent().getId() != null) { |
| throw new IllegalArgumentException("Output port ID cannot be specified."); |
| } |
| |
| final PositionDTO proposedPosition = requestPortEntity.getComponent().getPosition(); |
| if (proposedPosition != null) { |
| if (proposedPosition.getX() == null || proposedPosition.getY() == null) { |
| throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified."); |
| } |
| } |
| |
| if (requestPortEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestPortEntity.getComponent().getParentGroupId())) { |
| throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", |
| requestPortEntity.getComponent().getParentGroupId(), groupId)); |
| } |
| requestPortEntity.getComponent().setParentGroupId(groupId); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestPortEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestPortEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestPortEntity, |
| lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| null, |
| portEntity -> { |
| // set the processor id as appropriate |
| portEntity.getComponent().setId(generateUuid()); |
| |
| // create the output port and generate the json |
| final Revision revision = getRevision(portEntity, portEntity.getComponent().getId()); |
| final PortEntity entity = serviceFacade.createOutputPort(revision, groupId, portEntity.getComponent()); |
| outputPortResource.populateRemainingOutputPortEntityContent(entity); |
| |
| // build the response |
| return generateCreatedResponse(URI.create(entity.getUri()), entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Retrieves all the of output ports in this NiFi. |
| * |
| * @return A outputPortsEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/output-ports") |
| @ApiOperation( |
| value = "Gets all output ports", |
| response = OutputPortsEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getOutputPorts( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // get all the output ports |
| final Set<PortEntity> outputPorts = serviceFacade.getOutputPorts(groupId); |
| |
| // create the response entity |
| final OutputPortsEntity entity = new OutputPortsEntity(); |
| entity.setOutputPorts(outputPortResource.populateRemainingOutputPortEntitiesContent(outputPorts)); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| // ------- |
| // funnels |
| // ------- |
| |
| /** |
| * Creates a new Funnel. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestFunnelEntity A funnelEntity. |
| * @return A funnelEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/funnels") |
| @ApiOperation( |
| value = "Creates a funnel", |
| response = FunnelEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createFunnel( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The funnel configuration details.", |
| required = true |
| ) final FunnelEntity requestFunnelEntity) { |
| |
| if (requestFunnelEntity == null || requestFunnelEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Funnel details must be specified."); |
| } |
| |
| if (requestFunnelEntity.getRevision() == null || (requestFunnelEntity.getRevision().getVersion() == null || requestFunnelEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Funnel."); |
| } |
| |
| if (requestFunnelEntity.getComponent().getId() != null) { |
| throw new IllegalArgumentException("Funnel ID cannot be specified."); |
| } |
| |
| final PositionDTO proposedPosition = requestFunnelEntity.getComponent().getPosition(); |
| if (proposedPosition != null) { |
| if (proposedPosition.getX() == null || proposedPosition.getY() == null) { |
| throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified."); |
| } |
| } |
| |
| if (requestFunnelEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestFunnelEntity.getComponent().getParentGroupId())) { |
| throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", |
| requestFunnelEntity.getComponent().getParentGroupId(), groupId)); |
| } |
| requestFunnelEntity.getComponent().setParentGroupId(groupId); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestFunnelEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestFunnelEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestFunnelEntity, |
| lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| null, |
| funnelEntity -> { |
| // set the processor id as appropriate |
| funnelEntity.getComponent().setId(generateUuid()); |
| |
| // create the funnel and generate the json |
| final Revision revision = getRevision(funnelEntity, funnelEntity.getComponent().getId()); |
| final FunnelEntity entity = serviceFacade.createFunnel(revision, groupId, funnelEntity.getComponent()); |
| funnelResource.populateRemainingFunnelEntityContent(entity); |
| |
| // build the response |
| return generateCreatedResponse(URI.create(entity.getUri()), entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Retrieves all the of funnels in this NiFi. |
| * |
| * @return A funnelsEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/funnels") |
| @ApiOperation( |
| value = "Gets all funnels", |
| response = FunnelsEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getFunnels( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // get all the funnels |
| final Set<FunnelEntity> funnels = serviceFacade.getFunnels(groupId); |
| |
| // create the response entity |
| final FunnelsEntity entity = new FunnelsEntity(); |
| entity.setFunnels(funnelResource.populateRemainingFunnelEntitiesContent(funnels)); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| // ------ |
| // labels |
| // ------ |
| |
| /** |
| * Creates a new Label. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestLabelEntity A labelEntity. |
| * @return A labelEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/labels") |
| @ApiOperation( |
| value = "Creates a label", |
| response = LabelEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createLabel( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The label configuration details.", |
| required = true |
| ) final LabelEntity requestLabelEntity) { |
| |
| if (requestLabelEntity == null || requestLabelEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Label details must be specified."); |
| } |
| |
| if (requestLabelEntity.getRevision() == null || (requestLabelEntity.getRevision().getVersion() == null || requestLabelEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Label."); |
| } |
| |
| if (requestLabelEntity.getComponent().getId() != null) { |
| throw new IllegalArgumentException("Label ID cannot be specified."); |
| } |
| |
| final PositionDTO proposedPosition = requestLabelEntity.getComponent().getPosition(); |
| if (proposedPosition != null) { |
| if (proposedPosition.getX() == null || proposedPosition.getY() == null) { |
| throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified."); |
| } |
| } |
| |
| if (requestLabelEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestLabelEntity.getComponent().getParentGroupId())) { |
| throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", |
| requestLabelEntity.getComponent().getParentGroupId(), groupId)); |
| } |
| requestLabelEntity.getComponent().setParentGroupId(groupId); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestLabelEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestLabelEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestLabelEntity, |
| lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| null, |
| labelEntity -> { |
| // set the processor id as appropriate |
| labelEntity.getComponent().setId(generateUuid()); |
| |
| // create the label and generate the json |
| final Revision revision = getRevision(labelEntity, labelEntity.getComponent().getId()); |
| final LabelEntity entity = serviceFacade.createLabel(revision, groupId, labelEntity.getComponent()); |
| labelResource.populateRemainingLabelEntityContent(entity); |
| |
| // build the response |
| return generateCreatedResponse(URI.create(entity.getUri()), entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Retrieves all the of labels in this NiFi. |
| * |
| * @return A labelsEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/labels") |
| @ApiOperation( |
| value = "Gets all labels", |
| response = LabelsEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getLabels( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // get all the labels |
| final Set<LabelEntity> labels = serviceFacade.getLabels(groupId); |
| |
| // create the response entity |
| final LabelsEntity entity = new LabelsEntity(); |
| entity.setLabels(labelResource.populateRemainingLabelEntitiesContent(labels)); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| // --------------------- |
| // remote process groups |
| // --------------------- |
| |
| /** |
| * Creates a new remote process group. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestRemoteProcessGroupEntity A remoteProcessGroupEntity. |
| * @return A remoteProcessGroupEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/remote-process-groups") |
| @ApiOperation( |
| value = "Creates a new process group", |
| response = RemoteProcessGroupEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createRemoteProcessGroup( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The remote process group configuration details.", |
| required = true |
| ) final RemoteProcessGroupEntity requestRemoteProcessGroupEntity) { |
| |
| if (requestRemoteProcessGroupEntity == null || requestRemoteProcessGroupEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Remote process group details must be specified."); |
| } |
| |
| if (requestRemoteProcessGroupEntity.getRevision() == null |
| || (requestRemoteProcessGroupEntity.getRevision().getVersion() == null || requestRemoteProcessGroupEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Remote process group."); |
| } |
| |
| final RemoteProcessGroupDTO requestRemoteProcessGroupDTO = requestRemoteProcessGroupEntity.getComponent(); |
| |
| if (requestRemoteProcessGroupDTO.getId() != null) { |
| throw new IllegalArgumentException("Remote process group ID cannot be specified."); |
| } |
| |
| if (requestRemoteProcessGroupDTO.getTargetUri() == null) { |
| throw new IllegalArgumentException("The URI of the process group must be specified."); |
| } |
| |
| final PositionDTO proposedPosition = requestRemoteProcessGroupDTO.getPosition(); |
| if (proposedPosition != null) { |
| if (proposedPosition.getX() == null || proposedPosition.getY() == null) { |
| throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified."); |
| } |
| } |
| |
| if (requestRemoteProcessGroupDTO.getParentGroupId() != null && !groupId.equals(requestRemoteProcessGroupDTO.getParentGroupId())) { |
| throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", |
| requestRemoteProcessGroupDTO.getParentGroupId(), groupId)); |
| } |
| requestRemoteProcessGroupDTO.setParentGroupId(groupId); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestRemoteProcessGroupEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestRemoteProcessGroupEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestRemoteProcessGroupEntity, |
| lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| null, |
| remoteProcessGroupEntity -> { |
| final RemoteProcessGroupDTO remoteProcessGroupDTO = remoteProcessGroupEntity.getComponent(); |
| |
| // set the processor id as appropriate |
| remoteProcessGroupDTO.setId(generateUuid()); |
| |
| // parse the uri to check if the uri is valid |
| final String targetUris = remoteProcessGroupDTO.getTargetUris(); |
| SiteToSiteRestApiClient.parseClusterUrls(targetUris); |
| |
| // since the uri is valid, use it |
| remoteProcessGroupDTO.setTargetUris(targetUris); |
| |
| // create the remote process group |
| final Revision revision = getRevision(remoteProcessGroupEntity, remoteProcessGroupDTO.getId()); |
| final RemoteProcessGroupEntity entity = serviceFacade.createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO); |
| remoteProcessGroupResource.populateRemainingRemoteProcessGroupEntityContent(entity); |
| |
| return generateCreatedResponse(URI.create(entity.getUri()), entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Retrieves all the of remote process groups in this NiFi. |
| * |
| * @return A remoteProcessGroupEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/remote-process-groups") |
| @ApiOperation( |
| value = "Gets all remote process groups", |
| response = RemoteProcessGroupsEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getRemoteProcessGroups( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // get all the remote process groups |
| final Set<RemoteProcessGroupEntity> remoteProcessGroups = serviceFacade.getRemoteProcessGroups(groupId); |
| |
| // prune response as necessary |
| for (RemoteProcessGroupEntity remoteProcessGroupEntity : remoteProcessGroups) { |
| if (remoteProcessGroupEntity.getComponent() != null) { |
| remoteProcessGroupEntity.getComponent().setContents(null); |
| } |
| } |
| |
| // create the response entity |
| final RemoteProcessGroupsEntity entity = new RemoteProcessGroupsEntity(); |
| entity.setRemoteProcessGroups(remoteProcessGroupResource.populateRemainingRemoteProcessGroupEntitiesContent(remoteProcessGroups)); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| // ----------- |
| // connections |
| // ----------- |
| |
| /** |
| * Creates a new connection. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestConnectionEntity A connectionEntity. |
| * @return A connectionEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/connections") |
| @ApiOperation( |
| value = "Creates a connection", |
| response = ConnectionEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}"), |
| @Authorization(value = "Write Source - /{component-type}/{uuid}"), |
| @Authorization(value = "Write Destination - /{component-type}/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createConnection( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The connection configuration details.", |
| required = true |
| ) final ConnectionEntity requestConnectionEntity) { |
| |
| if (requestConnectionEntity == null || requestConnectionEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Connection details must be specified."); |
| } |
| |
| if (requestConnectionEntity.getRevision() == null || (requestConnectionEntity.getRevision().getVersion() == null || requestConnectionEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Connection."); |
| } |
| |
| if (requestConnectionEntity.getComponent().getId() != null) { |
| throw new IllegalArgumentException("Connection ID cannot be specified."); |
| } |
| |
| final List<PositionDTO> proposedBends = requestConnectionEntity.getComponent().getBends(); |
| if (proposedBends != null) { |
| for (final PositionDTO proposedBend : proposedBends) { |
| if (proposedBend.getX() == null || proposedBend.getY() == null) { |
| throw new IllegalArgumentException("The x and y coordinate of the each bend must be specified."); |
| } |
| } |
| } |
| |
| if (requestConnectionEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestConnectionEntity.getComponent().getParentGroupId())) { |
| throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", |
| requestConnectionEntity.getComponent().getParentGroupId(), groupId)); |
| } |
| requestConnectionEntity.getComponent().setParentGroupId(groupId); |
| |
| // get the connection |
| final ConnectionDTO requestConnection = requestConnectionEntity.getComponent(); |
| |
| if (requestConnection.getSource() == null || requestConnection.getSource().getId() == null) { |
| throw new IllegalArgumentException("The source of the connection must be specified."); |
| } |
| |
| if (requestConnection.getSource().getType() == null) { |
| throw new IllegalArgumentException("The type of the source of the connection must be specified."); |
| } |
| |
| final ConnectableType sourceConnectableType; |
| try { |
| sourceConnectableType = ConnectableType.valueOf(requestConnection.getSource().getType()); |
| } catch (final IllegalArgumentException e) { |
| throw new IllegalArgumentException(String.format("Unrecognized source type %s. Expected values are [%s]", |
| requestConnection.getSource().getType(), StringUtils.join(ConnectableType.values(), ", "))); |
| } |
| |
| if (requestConnection.getDestination() == null || requestConnection.getDestination().getId() == null) { |
| throw new IllegalArgumentException("The destination of the connection must be specified."); |
| } |
| |
| if (requestConnection.getDestination().getType() == null) { |
| throw new IllegalArgumentException("The type of the destination of the connection must be specified."); |
| } |
| |
| final ConnectableType destinationConnectableType; |
| try { |
| destinationConnectableType = ConnectableType.valueOf(requestConnection.getDestination().getType()); |
| } catch (final IllegalArgumentException e) { |
| throw new IllegalArgumentException(String.format("Unrecognized destination type %s. Expected values are [%s]", |
| requestConnection.getDestination().getType(), StringUtils.join(ConnectableType.values(), ", "))); |
| } |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestConnectionEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestConnectionEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestConnectionEntity, |
| lookup -> { |
| // ensure write access to the group |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| |
| // explicitly handle RPGs differently as the connectable id can be ambiguous if self referencing |
| final Authorizable source; |
| if (ConnectableType.REMOTE_OUTPUT_PORT.equals(sourceConnectableType)) { |
| source = lookup.getRemoteProcessGroup(requestConnection.getSource().getGroupId()); |
| } else { |
| source = lookup.getLocalConnectable(requestConnection.getSource().getId()); |
| } |
| |
| // ensure write access to the source |
| if (source == null) { |
| throw new ResourceNotFoundException("Cannot find source component with ID [" + requestConnection.getSource().getId() + "]"); |
| } |
| source.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| |
| // explicitly handle RPGs differently as the connectable id can be ambiguous if self referencing |
| final Authorizable destination; |
| if (ConnectableType.REMOTE_INPUT_PORT.equals(destinationConnectableType)) { |
| destination = lookup.getRemoteProcessGroup(requestConnection.getDestination().getGroupId()); |
| } else { |
| destination = lookup.getLocalConnectable(requestConnection.getDestination().getId()); |
| } |
| |
| // ensure write access to the destination |
| if (destination == null) { |
| throw new ResourceNotFoundException("Cannot find destination component with ID [" + requestConnection.getDestination().getId() + "]"); |
| } |
| |
| destination.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| () -> serviceFacade.verifyCreateConnection(groupId, requestConnection), |
| connectionEntity -> { |
| final ConnectionDTO connection = connectionEntity.getComponent(); |
| |
| // set the connection id as appropriate |
| connection.setId(generateUuid()); |
| |
| // create the new relationship target |
| final Revision revision = getRevision(connectionEntity, connection.getId()); |
| final ConnectionEntity entity = serviceFacade.createConnection(revision, groupId, connection); |
| connectionResource.populateRemainingConnectionEntityContent(entity); |
| |
| // extract the href and build the response |
| String uri = entity.getUri(); |
| return generateCreatedResponse(URI.create(uri), entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Gets all the connections. |
| * |
| * @return A connectionsEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/connections") |
| @ApiOperation( |
| value = "Gets all connections", |
| response = ConnectionsEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getConnections( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") String groupId) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // authorize access |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| |
| // all of the relationships for the specified source processor |
| Set<ConnectionEntity> connections = serviceFacade.getConnections(groupId); |
| |
| // create the client response entity |
| ConnectionsEntity entity = new ConnectionsEntity(); |
| entity.setConnections(connectionResource.populateRemainingConnectionEntitiesContent(connections)); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| // ---------------- |
| // snippet instance |
| // ---------------- |
| |
| /** |
| * Copies the specified snippet within this ProcessGroup. The snippet instance that is instantiated cannot be referenced at a later time, therefore there is no |
| * corresponding URI. Instead the request URI is returned. |
| * <p> |
| * Alternatively, we could have performed a PUT request. However, PUT requests are supposed to be idempotent and this endpoint is certainly not. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestCopySnippetEntity The copy snippet request |
| * @return A flowSnippetEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/snippet-instance") |
| @ApiOperation( |
| value = "Copies a snippet and discards it.", |
| response = FlowEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}"), |
| @Authorization(value = "Read - /{component-type}/{uuid} - For each component in the snippet and their descendant components"), |
| @Authorization(value = "Write - if the snippet contains any restricted Processors - /restricted-components") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response copySnippet( |
| @Context HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") String groupId, |
| @ApiParam( |
| value = "The copy snippet request.", |
| required = true |
| ) CopySnippetRequestEntity requestCopySnippetEntity) { |
| |
| // ensure the position has been specified |
| if (requestCopySnippetEntity == null || requestCopySnippetEntity.getOriginX() == null || requestCopySnippetEntity.getOriginY() == null) { |
| throw new IllegalArgumentException("The origin position (x, y) must be specified"); |
| } |
| |
| if (requestCopySnippetEntity.getSnippetId() == null) { |
| throw new IllegalArgumentException("The snippet id must be specified."); |
| } |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestCopySnippetEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestCopySnippetEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestCopySnippetEntity, |
| lookup -> { |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| final SnippetAuthorizable snippet = authorizeSnippetUsage(lookup, groupId, requestCopySnippetEntity.getSnippetId(), false, true); |
| |
| final Consumer<ComponentAuthorizable> authorizeRestricted = authorizable -> { |
| if (authorizable.isRestricted()) { |
| authorizeRestrictions(authorizer, authorizable); |
| } |
| }; |
| |
| // consider each processor. note - this request will not create new controller services so we do not need to check |
| // for if there are not restricted controller services. it will however, need to authorize the user has access |
| // to any referenced services and this is done within authorizeSnippetUsage above. |
| // Also ensure that user has READ permissions to the Parameter Contexts in order to copy them. |
| snippet.getSelectedProcessors().forEach(authorizeRestricted); |
| for (final ProcessGroupAuthorizable groupAuthorizable : snippet.getSelectedProcessGroups()) { |
| groupAuthorizable.getEncapsulatedProcessors().forEach(authorizeRestricted); |
| |
| final ParameterContext parameterContext = groupAuthorizable.getProcessGroup().getParameterContext(); |
| if (parameterContext != null) { |
| parameterContext.authorize(authorizer, RequestAction.READ, user); |
| } |
| |
| for (final ProcessGroupAuthorizable encapsulatedGroupAuth : groupAuthorizable.getEncapsulatedProcessGroups()) { |
| final ParameterContext encapsulatedGroupParameterContext = encapsulatedGroupAuth.getProcessGroup().getParameterContext(); |
| if (encapsulatedGroupParameterContext != null) { |
| encapsulatedGroupParameterContext.authorize(authorizer, RequestAction.READ, user); |
| } |
| |
| } |
| } |
| }, |
| null, |
| copySnippetRequestEntity -> { |
| // copy the specified snippet |
| final FlowEntity flowEntity = serviceFacade.copySnippet( |
| groupId, copySnippetRequestEntity.getSnippetId(), copySnippetRequestEntity.getOriginX(), copySnippetRequestEntity.getOriginY(), getIdGenerationSeed().orElse(null)); |
| |
| // get the snippet |
| final FlowDTO flow = flowEntity.getFlow(); |
| |
| // prune response as necessary |
| for (ProcessGroupEntity childGroupEntity : flow.getProcessGroups()) { |
| childGroupEntity.getComponent().setContents(null); |
| } |
| |
| // create the response entity |
| populateRemainingSnippetContent(flow); |
| |
| // generate the response |
| return generateCreatedResponse(getAbsolutePath(), flowEntity).build(); |
| } |
| ); |
| } |
| |
| // ----------------- |
| // template instance |
| // ----------------- |
| |
| /** |
| * Discovers the compatible bundle details for the components in the specified snippet. |
| * |
| * @param snippet the snippet |
| */ |
| private void discoverCompatibleBundles(final FlowSnippetDTO snippet) { |
| if (snippet.getProcessors() != null) { |
| snippet.getProcessors().forEach(processor -> { |
| final BundleCoordinate coordinate = serviceFacade.getCompatibleBundle(processor.getType(), processor.getBundle()); |
| processor.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion())); |
| }); |
| } |
| |
| if (snippet.getControllerServices() != null) { |
| snippet.getControllerServices().forEach(controllerService -> { |
| final BundleCoordinate coordinate = serviceFacade.getCompatibleBundle(controllerService.getType(), controllerService.getBundle()); |
| controllerService.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion())); |
| }); |
| } |
| |
| if (snippet.getProcessGroups() != null) { |
| snippet.getProcessGroups().forEach(processGroup -> { |
| discoverCompatibleBundles(processGroup.getContents()); |
| }); |
| } |
| } |
| |
| /** |
| * Instantiates the specified template within this ProcessGroup. The template instance that is instantiated cannot be referenced at a later time, therefore there is no |
| * corresponding URI. Instead the request URI is returned. |
| * <p> |
| * Alternatively, we could have performed a PUT request. However, PUT requests are supposed to be idempotent and this endpoint is certainly not. |
| * |
| * @param httpServletRequest request |
| * @param groupId The group id |
| * @param requestInstantiateTemplateRequestEntity The instantiate template request |
| * @return A flowEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/template-instance") |
| @ApiOperation( |
| value = "Instantiates a template", |
| response = FlowEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}"), |
| @Authorization(value = "Read - /templates/{uuid}"), |
| @Authorization(value = "Write - if the template contains any restricted components - /restricted-components") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response instantiateTemplate( |
| @Context HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") String groupId, |
| @ApiParam( |
| value = "The instantiate template request.", |
| required = true |
| ) InstantiateTemplateRequestEntity requestInstantiateTemplateRequestEntity) { |
| |
| // ensure the position has been specified |
| if (requestInstantiateTemplateRequestEntity == null || requestInstantiateTemplateRequestEntity.getOriginX() == null || requestInstantiateTemplateRequestEntity.getOriginY() == null) { |
| throw new IllegalArgumentException("The origin position (x, y) must be specified."); |
| } |
| |
| // ensure the template id was provided |
| if (requestInstantiateTemplateRequestEntity.getTemplateId() == null) { |
| throw new IllegalArgumentException("The template id must be specified."); |
| } |
| |
| // ensure the template encoding version is valid |
| if (requestInstantiateTemplateRequestEntity.getEncodingVersion() != null) { |
| try { |
| FlowEncodingVersion.parse(requestInstantiateTemplateRequestEntity.getEncodingVersion()); |
| } catch (final IllegalArgumentException e) { |
| throw new IllegalArgumentException("The template encoding version is not valid. The expected format is <number>.<number>"); |
| } |
| } |
| |
| // populate the encoding version if necessary |
| if (requestInstantiateTemplateRequestEntity.getEncodingVersion() == null) { |
| // if the encoding version is not specified, use the latest encoding version as these options were |
| // not available pre 1.x, will be overridden if populating from the underlying template below |
| requestInstantiateTemplateRequestEntity.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION); |
| } |
| |
| // populate the component bundles if necessary |
| if (requestInstantiateTemplateRequestEntity.getSnippet() == null) { |
| // get the desired template in order to determine the supported bundles |
| final TemplateDTO requestedTemplate = serviceFacade.exportTemplate(requestInstantiateTemplateRequestEntity.getTemplateId()); |
| final FlowSnippetDTO requestTemplateContents = requestedTemplate.getSnippet(); |
| |
| // determine the compatible bundles to use for each component in this template, this ensures the nodes in the cluster |
| // instantiate the components from the same bundles |
| discoverCompatibleBundles(requestTemplateContents); |
| |
| // update the requested template as necessary - use the encoding version from the underlying template |
| requestInstantiateTemplateRequestEntity.setEncodingVersion(requestedTemplate.getEncodingVersion()); |
| requestInstantiateTemplateRequestEntity.setSnippet(requestTemplateContents); |
| } |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestInstantiateTemplateRequestEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestInstantiateTemplateRequestEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestInstantiateTemplateRequestEntity, |
| lookup -> { |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| // ensure write on the group |
| final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); |
| final Authorizable processGroup = groupAuthorizable.getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, user); |
| |
| final Authorizable template = lookup.getTemplate(requestInstantiateTemplateRequestEntity.getTemplateId()); |
| template.authorize(authorizer, RequestAction.READ, user); |
| |
| // ensure read on the template |
| final TemplateContentsAuthorizable templateContents = lookup.getTemplateContents(requestInstantiateTemplateRequestEntity.getSnippet()); |
| |
| final Consumer<ComponentAuthorizable> authorizeRestricted = authorizable -> { |
| if (authorizable.isRestricted()) { |
| authorizeRestrictions(authorizer, authorizable); |
| } |
| }; |
| |
| // ensure restricted access if necessary |
| templateContents.getEncapsulatedProcessors().forEach(authorizeRestricted); |
| templateContents.getEncapsulatedControllerServices().forEach(authorizeRestricted); |
| |
| final Authorizable parameterContext = groupAuthorizable.getProcessGroup().getParameterContext(); |
| if (parameterContext != null) { |
| AuthorizeParameterReference.authorizeParameterReferences(requestInstantiateTemplateRequestEntity.getSnippet(), authorizer, parameterContext, user); |
| } |
| }, |
| () -> serviceFacade.verifyCanInstantiate(groupId, requestInstantiateTemplateRequestEntity.getSnippet()), |
| instantiateTemplateRequestEntity -> { |
| final FlowSnippetDTO snippet = instantiateTemplateRequestEntity.getSnippet(); |
| |
| // Check if the snippet contains any public port violating public port unique constraint with the current flow |
| verifyPublicPortUniqueness(snippet); |
| |
| // create the template and generate the json |
| final FlowEntity entity = serviceFacade.createTemplateInstance(groupId, instantiateTemplateRequestEntity.getOriginX(), instantiateTemplateRequestEntity.getOriginY(), |
| instantiateTemplateRequestEntity.getEncodingVersion(), snippet, getIdGenerationSeed().orElse(null)); |
| |
| final FlowDTO flowSnippet = entity.getFlow(); |
| |
| // prune response as necessary |
| for (ProcessGroupEntity childGroupEntity : flowSnippet.getProcessGroups()) { |
| childGroupEntity.getComponent().setContents(null); |
| } |
| |
| // create the response entity |
| populateRemainingSnippetContent(flowSnippet); |
| |
| // generate the response |
| return generateCreatedResponse(getAbsolutePath(), entity).build(); |
| } |
| ); |
| } |
| |
| private void verifyPublicPortUniqueness(FlowSnippetDTO snippet) { |
| snippet.getInputPorts().stream().filter(portDTO -> Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) |
| .forEach(portDTO -> { |
| try { |
| serviceFacade.verifyPublicInputPortUniqueness(portDTO.getId(), portDTO.getName()); |
| } catch (IllegalStateException e) { |
| throw toPublicPortUniqueConstraintViolationException("input", portDTO); |
| } |
| }); |
| |
| snippet.getOutputPorts().stream().filter(portDTO -> Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) |
| .forEach(portDTO -> { |
| try { |
| serviceFacade.verifyPublicOutputPortUniqueness(portDTO.getId(), portDTO.getName()); |
| } catch (IllegalStateException e) { |
| throw toPublicPortUniqueConstraintViolationException("output", portDTO); |
| } |
| }); |
| |
| snippet.getProcessGroups().forEach(processGroupDTO -> verifyPublicPortUniqueness(processGroupDTO.getContents())); |
| } |
| |
| private IllegalStateException toPublicPortUniqueConstraintViolationException(final String portType, final PortDTO portDTO) { |
| return new IllegalStateException(String.format("The %s port [%s] named '%s' will violate the public port unique constraint." + |
| " Rename the existing port name, or the one in the template to instantiate the template in this flow.", portType, portDTO.getId(), portDTO.getName())); |
| } |
| |
| // --------- |
| // templates |
| // --------- |
| |
| private SnippetAuthorizable authorizeSnippetUsage(final AuthorizableLookup lookup, final String groupId, final String snippetId, |
| final boolean authorizeTransitiveServices, final boolean authorizeParameterReferences) { |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| // ensure write access to the target process group |
| lookup.getProcessGroup(groupId).getAuthorizable().authorize(authorizer, RequestAction.WRITE, user); |
| |
| // ensure read permission to every component in the snippet including referenced services |
| final SnippetAuthorizable snippet = lookup.getSnippet(snippetId); |
| authorizeSnippet(snippet, authorizer, lookup, RequestAction.READ, true, authorizeTransitiveServices, authorizeParameterReferences); |
| return snippet; |
| } |
| |
| /** |
| * Creates a new template based off of the specified template. |
| * |
| * @param httpServletRequest request |
| * @param requestCreateTemplateRequestEntity request to create the template |
| * @return A templateEntity |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/templates") |
| @ApiOperation( |
| value = "Creates a template and discards the specified snippet.", |
| response = TemplateEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}"), |
| @Authorization(value = "Read - /{component-type}/{uuid} - For each component in the snippet and their descendant components") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createTemplate( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The create template request.", |
| required = true |
| ) final CreateTemplateRequestEntity requestCreateTemplateRequestEntity) { |
| |
| if (requestCreateTemplateRequestEntity == null || requestCreateTemplateRequestEntity.getSnippetId() == null) { |
| throw new IllegalArgumentException("The snippet identifier must be specified."); |
| } |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestCreateTemplateRequestEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestCreateTemplateRequestEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestCreateTemplateRequestEntity, |
| lookup -> { |
| authorizeSnippetUsage(lookup, groupId, requestCreateTemplateRequestEntity.getSnippetId(), true, false); |
| }, |
| () -> serviceFacade.verifyCanAddTemplate(groupId, requestCreateTemplateRequestEntity.getName()), |
| createTemplateRequestEntity -> { |
| // create the template and generate the json |
| final TemplateDTO template = serviceFacade.createTemplate(createTemplateRequestEntity.getName(), createTemplateRequestEntity.getDescription(), |
| createTemplateRequestEntity.getSnippetId(), groupId, getIdGenerationSeed()); |
| templateResource.populateRemainingTemplateContent(template); |
| |
| // build the response entity |
| final TemplateEntity entity = new TemplateEntity(); |
| entity.setTemplate(template); |
| |
| // build the response |
| return generateCreatedResponse(URI.create(template.getUri()), entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Imports the specified template. |
| * |
| * @param httpServletRequest request |
| * @param in The template stream |
| * @return A templateEntity or an errorResponse XML snippet. |
| * @throws InterruptedException if interrupted |
| */ |
| @POST |
| @Consumes(MediaType.MULTIPART_FORM_DATA) |
| @Produces(MediaType.APPLICATION_XML) |
| @Path("{id}/templates/upload") |
| @ApiOperation( |
| value = "Uploads a template", |
| response = TemplateEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiImplicitParams( |
| value = { |
| @ApiImplicitParam( |
| name = "template", |
| value = "The binary content of the template file being uploaded.", |
| required = true, |
| type = "file", |
| paramType = "formData") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response uploadTemplate( |
| @Context final HttpServletRequest httpServletRequest, |
| @Context final UriInfo uriInfo, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", |
| required = false |
| ) |
| @FormDataParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged, |
| @FormDataParam("template") final InputStream in) throws InterruptedException { |
| |
| // unmarshal the template |
| final TemplateDTO template; |
| try { |
| // TODO: Potentially refactor the template parsing to a service layer outside of the resource for web request handling |
| JAXBContext context = JAXBContext.newInstance(TemplateDTO.class); |
| Unmarshaller unmarshaller = context.createUnmarshaller(); |
| XMLStreamReader xsr = XmlUtils.createSafeReader(in); |
| JAXBElement<TemplateDTO> templateElement = unmarshaller.unmarshal(xsr, TemplateDTO.class); |
| template = templateElement.getValue(); |
| } catch (JAXBException jaxbe) { |
| logger.warn("An error occurred while parsing a template.", jaxbe); |
| String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"The specified template is not in a valid format.\"/>", Response.Status.BAD_REQUEST.getStatusCode()); |
| return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build(); |
| } catch (IllegalArgumentException iae) { |
| logger.warn("Unable to import template.", iae); |
| String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"%s\"/>", Response.Status.BAD_REQUEST.getStatusCode(), sanitizeErrorResponse(iae.getMessage())); |
| return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build(); |
| } catch (Exception e) { |
| logger.warn("An error occurred while importing a template.", e); |
| String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"Unable to import the specified template: %s\"/>", |
| Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), sanitizeErrorResponse(e.getMessage())); |
| return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build(); |
| } |
| |
| if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(disconnectedNodeAcknowledged); |
| } |
| |
| // build the response entity |
| TemplateEntity entity = new TemplateEntity(); |
| entity.setTemplate(template); |
| entity.setDisconnectedNodeAcknowledged(disconnectedNodeAcknowledged); |
| |
| if (isReplicateRequest()) { |
| // convert request accordingly |
| final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); |
| uriBuilder.segment("process-groups", groupId, "templates", "import"); |
| final URI importUri = uriBuilder.build(); |
| |
| final Map<String, String> headersToOverride = new HashMap<>(); |
| headersToOverride.put("content-type", MediaType.APPLICATION_XML); |
| |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly |
| // to the cluster nodes themselves. |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse(); |
| } else { |
| return getRequestReplicator().forwardToCoordinator( |
| getClusterCoordinatorNode(), HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse(); |
| } |
| } |
| |
| // otherwise import the template locally |
| return importTemplate(httpServletRequest, groupId, entity); |
| } |
| |
| /** |
| * Returns the sanitized error response which can safely be displayed on the error page. |
| * |
| * @param errorResponse the initial error response |
| * @return the HTML-escaped error response |
| */ |
| private String sanitizeErrorResponse(String errorResponse) { |
| if (errorResponse == null || StringUtils.isEmpty(errorResponse)) { |
| return ""; |
| } |
| |
| return StringEscapeUtils.escapeHtml4(errorResponse); |
| } |
| |
| /** |
| * Imports the specified template. |
| * |
| * @param httpServletRequest request |
| * @param requestTemplateEntity A templateEntity. |
| * @return A templateEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_XML) |
| @Produces(MediaType.APPLICATION_XML) |
| @Path("{id}/templates/import") |
| @ApiOperation( |
| value = "Imports a template", |
| response = TemplateEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response importTemplate( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| final TemplateEntity requestTemplateEntity) { |
| |
| // verify the template was specified |
| if (requestTemplateEntity == null || requestTemplateEntity.getTemplate() == null || requestTemplateEntity.getTemplate().getSnippet() == null) { |
| throw new IllegalArgumentException("Template details must be specified."); |
| } |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestTemplateEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestTemplateEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestTemplateEntity, |
| lookup -> { |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| () -> serviceFacade.verifyCanAddTemplate(groupId, requestTemplateEntity.getTemplate().getName()), |
| templateEntity -> { |
| try { |
| // import the template |
| final TemplateDTO template = serviceFacade.importTemplate(templateEntity.getTemplate(), groupId, getIdGenerationSeed()); |
| templateResource.populateRemainingTemplateContent(template); |
| |
| // build the response entity |
| TemplateEntity entity = new TemplateEntity(); |
| entity.setTemplate(template); |
| |
| // build the response |
| return generateCreatedResponse(URI.create(template.getUri()), entity).build(); |
| } catch (IllegalArgumentException | IllegalStateException e) { |
| logger.info("Unable to import template: " + e); |
| String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"%s\"/>", Response.Status.BAD_REQUEST.getStatusCode(), sanitizeErrorResponse(e.getMessage())); |
| return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build(); |
| } catch (Exception e) { |
| logger.warn("An error occurred while importing a template.", e); |
| String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"Unable to import the specified template: %s\"/>", |
| Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), sanitizeErrorResponse(e.getMessage())); |
| return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build(); |
| } |
| } |
| ); |
| } |
| |
| // ------------------- |
| // controller services |
| // ------------------- |
| |
| /** |
| * Creates a new Controller Service. |
| * |
| * @param httpServletRequest request |
| * @param requestControllerServiceEntity A controllerServiceEntity. |
| * @return A controllerServiceEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/controller-services") |
| @ApiOperation( |
| value = "Creates a new controller service", |
| response = ControllerServiceEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}"), |
| @Authorization(value = "Read - any referenced Controller Services - /controller-services/{uuid}"), |
| @Authorization(value = "Write - if the Controller Service is restricted - /restricted-components") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response createControllerService( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The controller service configuration details.", |
| required = true |
| ) final ControllerServiceEntity requestControllerServiceEntity) { |
| |
| if (requestControllerServiceEntity == null || requestControllerServiceEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Controller service details must be specified."); |
| } |
| |
| if (requestControllerServiceEntity.getRevision() == null |
| || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service."); |
| } |
| |
| final ControllerServiceDTO requestControllerService = requestControllerServiceEntity.getComponent(); |
| if (requestControllerService.getId() != null) { |
| throw new IllegalArgumentException("Controller service ID cannot be specified."); |
| } |
| |
| if (StringUtils.isBlank(requestControllerService.getType())) { |
| throw new IllegalArgumentException("The type of controller service to create must be specified."); |
| } |
| |
| if (requestControllerService.getParentGroupId() != null && !groupId.equals(requestControllerService.getParentGroupId())) { |
| throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", |
| requestControllerService.getParentGroupId(), groupId)); |
| } |
| requestControllerService.setParentGroupId(groupId); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestControllerServiceEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestControllerServiceEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestControllerServiceEntity, |
| lookup -> { |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); |
| final Authorizable processGroup = groupAuthorizable.getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, user); |
| |
| final Authorizable parameterContext = groupAuthorizable.getProcessGroup().getParameterContext(); |
| if (parameterContext != null) { |
| AuthorizeParameterReference.authorizeParameterReferences(requestControllerService.getProperties(), authorizer, parameterContext, user); |
| } |
| |
| ComponentAuthorizable authorizable = null; |
| try { |
| authorizable = lookup.getConfigurableComponent(requestControllerService.getType(), requestControllerService.getBundle()); |
| |
| if (authorizable.isRestricted()) { |
| authorizeRestrictions(authorizer, authorizable); |
| } |
| |
| if (requestControllerService.getProperties() != null) { |
| AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(), authorizable, authorizer, lookup); |
| } |
| } finally { |
| if (authorizable != null) { |
| authorizable.cleanUpResources(); |
| } |
| } |
| }, |
| () -> serviceFacade.verifyCreateControllerService(requestControllerService), |
| controllerServiceEntity -> { |
| final ControllerServiceDTO controllerService = controllerServiceEntity.getComponent(); |
| |
| // set the processor id as appropriate |
| controllerService.setId(generateUuid()); |
| |
| // create the controller service and generate the json |
| final Revision revision = getRevision(controllerServiceEntity, controllerService.getId()); |
| final ControllerServiceEntity entity = serviceFacade.createControllerService(revision, groupId, controllerService); |
| controllerServiceResource.populateRemainingControllerServiceEntityContent(entity); |
| |
| // build the response |
| return generateCreatedResponse(URI.create(entity.getUri()), entity).build(); |
| } |
| ); |
| } |
| |
| /** |
| * Initiates the request to replace the Process Group with the given ID with the Process Group in the given import entity |
| * |
| * @param groupId The id of the process group to replace |
| * @param importEntity A request entity containing revision info and the process group to replace with |
| * @return A ProcessGroupReplaceRequestEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/replace-requests") |
| @ApiOperation( |
| value = "Initiate the Replace Request of a Process Group with the given ID", |
| response = ProcessGroupReplaceRequestEntity.class, |
| notes = "This will initiate the action of replacing a process group with the given process group. This can be a lengthy " |
| + "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, " |
| + "the endpoint will immediately return a ProcessGroupReplaceRequestEntity, and the process of replacing the flow will occur " |
| + "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to " |
| + "/process-groups/replace-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to " |
| + "/process-groups/replace-requests/{requestId}. " + NON_GUARANTEED_ENDPOINT, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}"), |
| @Authorization(value = "Write - /process-groups/{uuid}"), |
| @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"), |
| @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"), |
| @Authorization(value = "Write - if the template contains any restricted components - /restricted-components"), |
| @Authorization(value = "Read - /parameter-contexts/{uuid} - For any Parameter Context that is referenced by a Property that is changed, added, or removed") |
| } |
| ) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response initiateReplaceProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, |
| @ApiParam(value = "The process group replace request entity", required = true) final ProcessGroupImportEntity importEntity) { |
| if (importEntity == null) { |
| throw new IllegalArgumentException("Process Group Import Entity is required"); |
| } |
| |
| // replacing a flow under version control is not permitted via import. Versioned flows have additional requirements to allow |
| // them only to be replaced by a different version of the same flow. |
| if (serviceFacade.isAnyProcessGroupUnderVersionControl(groupId)) { |
| throw new IllegalStateException("Cannot replace a Process Group via import while it or its descendants are under Version Control."); |
| } |
| |
| final VersionedFlowSnapshot versionedFlowSnapshot = importEntity.getVersionedFlowSnapshot(); |
| if (versionedFlowSnapshot == null) { |
| throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied"); |
| } |
| |
| // remove any registry-specific versioning content which could be present if the flow was exported from registry |
| versionedFlowSnapshot.setFlow(null); |
| versionedFlowSnapshot.setBucket(null); |
| versionedFlowSnapshot.setSnapshotMetadata(null); |
| sanitizeRegistryInfo(versionedFlowSnapshot.getFlowContents()); |
| |
| return initiateFlowUpdate(groupId, importEntity, true, "replace-requests", |
| "/nifi-api/process-groups/" + groupId + "/flow-contents", importEntity::getVersionedFlowSnapshot); |
| } |
| |
| /** |
| * Recursively clear the registry info in the given versioned process group and all nested versioned process groups |
| * |
| * @param versionedProcessGroup the process group to sanitize |
| */ |
| private void sanitizeRegistryInfo(final VersionedProcessGroup versionedProcessGroup) { |
| versionedProcessGroup.setVersionedFlowCoordinates(null); |
| |
| for (final VersionedProcessGroup innerVersionedProcessGroup : versionedProcessGroup.getProcessGroups()) { |
| sanitizeRegistryInfo(innerVersionedProcessGroup); |
| } |
| } |
| |
| /** |
| * Uploads the specified versioned flow definition and adds it to a new process group. |
| * |
| * @param in The flow definition stream |
| * @return A processGroupEntity |
| * @throws IOException if there is an error during deserialization of the InputStream |
| */ |
| @POST |
| @Consumes(MediaType.MULTIPART_FORM_DATA) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/process-groups/upload") |
| @ApiOperation( |
| value = "Uploads a versioned flow definition and creates a process group", |
| response = ProcessGroupEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response uploadProcessGroup( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| @ApiParam( |
| value = "The process group name.", |
| required = true |
| ) |
| @FormDataParam("groupName") final String groupName, |
| @ApiParam( |
| value = "The process group X position.", |
| required = true |
| ) |
| @FormDataParam("positionX") final Double positionX, |
| @ApiParam( |
| value = "The process group Y position.", |
| required = true |
| ) |
| @FormDataParam("positionY") final Double positionY, |
| @ApiParam( |
| value = "The client id.", |
| required = true |
| ) |
| @FormDataParam("clientId") final String clientId, |
| @ApiParam( |
| value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", |
| required = false |
| ) |
| @FormDataParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged, |
| @FormDataParam("file") final InputStream in) throws InterruptedException { |
| |
| // ensure the group name is specified |
| if (StringUtils.isBlank(groupName)) { |
| throw new IllegalArgumentException("The process group name is required."); |
| } |
| |
| if (StringUtils.isBlank(groupId)) { |
| throw new IllegalArgumentException("The parent process group id is required"); |
| } |
| |
| if (positionX == null) { |
| throw new IllegalArgumentException("The x coordinate of the proposed position must be specified."); |
| } |
| |
| if (positionY == null) { |
| throw new IllegalArgumentException("The y coordinate of the proposed position must be specified."); |
| } |
| |
| if (StringUtils.isBlank(clientId)) { |
| throw new IllegalArgumentException("The client id must be specified"); |
| } |
| |
| // deserialize InputStream to a VersionedFlowSnapshot |
| VersionedFlowSnapshot deserializedSnapshot; |
| |
| try { |
| deserializedSnapshot = MAPPER.readValue(in, VersionedFlowSnapshot.class); |
| } catch (IOException e) { |
| logger.warn("Deserialization of uploaded JSON failed", e); |
| throw new IllegalArgumentException("Deserialization of uploaded JSON failed", e); |
| } |
| |
| // clear Registry info |
| sanitizeRegistryInfo(deserializedSnapshot.getFlowContents()); |
| |
| // resolve Bundle info |
| serviceFacade.discoverCompatibleBundles(deserializedSnapshot.getFlowContents()); |
| |
| // if there are any Controller Services referenced that are inherited from the parent group, |
| // resolve those to point to the appropriate Controller Service, if we are able to. |
| serviceFacade.resolveInheritedControllerServices(deserializedSnapshot, groupId, NiFiUserUtils.getNiFiUser()); |
| |
| if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(disconnectedNodeAcknowledged); |
| } |
| |
| // create a PositionDTO |
| final PositionDTO positionDTO = new PositionDTO(); |
| positionDTO.setX(positionX); |
| positionDTO.setY(positionY); |
| |
| // create a RevisionDTO |
| RevisionDTO revisionDTO = new RevisionDTO(); |
| revisionDTO.setClientId(clientId); |
| revisionDTO.setVersion((long) 0); |
| |
| // build the response entity for a replicate request |
| ProcessGroupUploadEntity pgUploadEntity = new ProcessGroupUploadEntity(); |
| pgUploadEntity.setGroupId(groupId); |
| pgUploadEntity.setGroupName(groupName); |
| pgUploadEntity.setDisconnectedNodeAcknowledged(disconnectedNodeAcknowledged); |
| pgUploadEntity.setFlowSnapshot(deserializedSnapshot); |
| pgUploadEntity.setPositionDTO(positionDTO); |
| pgUploadEntity.setRevisionDTO(revisionDTO); |
| |
| // replicate the request |
| if (isReplicateRequest()) { |
| // convert request accordingly |
| final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); |
| uriBuilder.segment("process-groups", groupId, "process-groups", "import"); |
| final URI importUri = uriBuilder.build(); |
| |
| final Map<String, String> headersToOverride = new HashMap<>(); |
| headersToOverride.put("content-type", MediaType.APPLICATION_JSON); |
| |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly |
| // to the cluster nodes themselves. |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| return getRequestReplicator().replicate(HttpMethod.POST, importUri, pgUploadEntity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse(); |
| } else { |
| return getRequestReplicator().forwardToCoordinator( |
| getClusterCoordinatorNode(), HttpMethod.POST, importUri, pgUploadEntity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse(); |
| } |
| } |
| |
| // otherwise import the process group locally |
| return importProcessGroup(groupId, pgUploadEntity); |
| } |
| |
| /** |
| * Imports the specified process group. |
| * |
| * @param processGroupUploadEntity A ProcessGroupUploadEntity. |
| * @return A processGroupEntity. |
| */ |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/process-groups/import") |
| @ApiOperation( |
| value = "Imports a specified process group", |
| response = ProcessGroupEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response importProcessGroup( |
| @ApiParam( |
| value = "The process group id.", |
| required = true |
| ) |
| @PathParam("id") final String groupId, |
| final ProcessGroupUploadEntity processGroupUploadEntity) { |
| |
| // verify the process group was specified |
| if (processGroupUploadEntity == null || processGroupUploadEntity.getFlowSnapshot() == null) { |
| throw new IllegalArgumentException("Process group details must be specified."); |
| } |
| |
| final VersionedFlowSnapshot versionedFlowSnapshot = processGroupUploadEntity.getFlowSnapshot(); |
| |
| // clear Registry info |
| sanitizeRegistryInfo(versionedFlowSnapshot.getFlowContents()); |
| |
| // resolve Bundle info |
| serviceFacade.discoverCompatibleBundles(versionedFlowSnapshot.getFlowContents()); |
| |
| // if there are any Controller Services referenced that are inherited from the parent group, |
| // resolve those to point to the appropriate Controller Service, if we are able to. |
| serviceFacade.resolveInheritedControllerServices(versionedFlowSnapshot, groupId, NiFiUserUtils.getNiFiUser()); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, processGroupUploadEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(processGroupUploadEntity.getDisconnectedNodeAcknowledged()); |
| } |
| |
| // create a new ProcessGroupEntity |
| final ProcessGroupEntity newProcessGroupEntity = createProcessGroupEntity(groupId, processGroupUploadEntity.getGroupName(), processGroupUploadEntity.getPositionDTO(), versionedFlowSnapshot); |
| |
| return withWriteLock( |
| serviceFacade, |
| newProcessGroupEntity, |
| lookup -> authorizeAccess(groupId, newProcessGroupEntity, lookup), |
| () -> { |
| final VersionedFlowSnapshot newVersionedFlowSnapshot = newProcessGroupEntity.getVersionedFlowSnapshot(); |
| if (newVersionedFlowSnapshot != null) { |
| serviceFacade.verifyComponentTypes(newVersionedFlowSnapshot.getFlowContents()); |
| } |
| }, |
| processGroupEntity -> { |
| final ProcessGroupDTO processGroup = processGroupEntity.getComponent(); |
| |
| // set the processor id as appropriate |
| processGroup.setId(generateUuid()); |
| |
| // get the versioned flow |
| final VersionedFlowSnapshot flowSnapshot = processGroupEntity.getVersionedFlowSnapshot(); |
| |
| // create the process group contents |
| final Revision revision = new Revision((long) 0, processGroupUploadEntity.getRevisionDTO().getClientId(), processGroup.getId()); |
| |
| ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroup); |
| |
| if (flowSnapshot != null) { |
| final RevisionDTO revisionDto = entity.getRevision(); |
| final String newGroupId = entity.getComponent().getId(); |
| final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId); |
| |
| // We don't want the Process Group's position to be updated because we want to keep the position where the user |
| // placed the Process Group. We do not want to use the name of the Process Group that is in the Flow Contents. |
| // To accomplish this, we call updateProcessGroupContents() passing 'false' for the updateSettings flag, set |
| // the Process Group name, and null out the position. |
| flowSnapshot.getFlowContents().setPosition(null); |
| flowSnapshot.getFlowContents().setName(processGroupUploadEntity.getGroupName()); |
| |
| entity = serviceFacade.updateProcessGroupContents(newGroupRevision, newGroupId, null, flowSnapshot, |
| getIdGenerationSeed().orElse(null), false, false, true); |
| } |
| |
| populateRemainingProcessGroupEntityContent(entity); |
| |
| // generate a 201 created response |
| String uri = entity.getUri(); |
| return generateCreatedResponse(URI.create(uri), entity).build(); |
| } |
| ); |
| |
| } |
| |
| |
| /** |
| * Replace the Process Group contents with the given ID with the specified Process Group contents. |
| * |
| * This is the endpoint used in a cluster update replication scenario. |
| * |
| * @param groupId The id of the process group to replace |
| * @param importEntity A request entity containing revision info and the process group to replace with |
| * @return A ProcessGroupImportEntity. |
| */ |
| @PUT |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}/flow-contents") |
| @ApiOperation( |
| value = "Replace Process Group contents with the given ID with the specified Process Group contents", |
| response = ProcessGroupImportEntity.class, |
| notes = "This endpoint is used for replication within a cluster, when replacing a flow with a new flow. It expects that the flow being" |
| + "replaced is not under version control and that the given snapshot will not modify any Processor that is currently running " |
| + "or any Controller Service that is enabled. " |
| + NON_GUARANTEED_ENDPOINT, |
| authorizations = { |
| @Authorization(value = "Read - /process-groups/{uuid}"), |
| @Authorization(value = "Write - /process-groups/{uuid}") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response replaceProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, |
| @ApiParam(value = "The process group replace request entity.", required = true) final ProcessGroupImportEntity importEntity) { |
| // Verify the request |
| if (importEntity == null) { |
| throw new IllegalArgumentException("Process Group Import Entity is required"); |
| } |
| |
| final RevisionDTO revisionDto = importEntity.getProcessGroupRevision(); |
| if (revisionDto == null) { |
| throw new IllegalArgumentException("Process Group Revision must be specified."); |
| } |
| |
| final VersionedFlowSnapshot requestFlowSnapshot = importEntity.getVersionedFlowSnapshot(); |
| if (requestFlowSnapshot == null) { |
| throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied."); |
| } |
| |
| // Perform the request |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.PUT, importEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(importEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| final Revision requestRevision = getRevision(importEntity.getProcessGroupRevision(), groupId); |
| return withWriteLock( |
| serviceFacade, |
| importEntity, |
| requestRevision, |
| lookup -> { |
| final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); |
| final Authorizable processGroup = groupAuthorizable.getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| () -> { |
| // We do not enforce that the Process Group is 'not dirty' because at this point, |
| // the client has explicitly indicated the dataflow that the Process Group should |
| // provide and provided the Revision to ensure that they have the most up-to-date |
| // view of the Process Group. |
| serviceFacade.verifyCanUpdate(groupId, requestFlowSnapshot, true, false); |
| }, |
| (revision, entity) -> { |
| final ProcessGroupEntity updatedGroup = |
| performUpdateFlow(groupId, revision, importEntity, entity.getVersionedFlowSnapshot(), |
| getIdGenerationSeed().orElse(null), false, true); |
| |
| // response to replication request is an entity with revision info but no versioned flow snapshot |
| final ProcessGroupImportEntity responseEntity = new ProcessGroupImportEntity(); |
| responseEntity.setProcessGroupRevision(updatedGroup.getRevision()); |
| |
| return generateOkResponse(responseEntity).build(); |
| }); |
| } |
| |
| /** |
| * Retrieve a request to replace a Process Group by request ID. |
| * |
| * @param replaceRequestId The ID of the replace request |
| * @return A ProcessGroupReplaceRequestEntity. |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("replace-requests/{id}") |
| @ApiOperation( |
| value = "Returns the Replace Request with the given ID", |
| response = ProcessGroupReplaceRequestEntity.class, |
| notes = "Returns the Replace Request with the given ID. Once a Replace Request has been created by performing a POST to /process-groups/{id}/replace-requests, " |
| + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the " |
| + "current state of the request, and any failures. " |
| + NON_GUARANTEED_ENDPOINT, |
| authorizations = { |
| @Authorization(value = "Only the user that submitted the request can get it") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response getReplaceProcessGroupRequest( |
| @ApiParam("The ID of the Replace Request") @PathParam("id") final String replaceRequestId) { |
| return retrieveFlowUpdateRequest("replace-requests", replaceRequestId); |
| } |
| |
| @DELETE |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("replace-requests/{id}") |
| @ApiOperation( |
| value = "Deletes the Replace Request with the given ID", |
| response = ProcessGroupReplaceRequestEntity.class, |
| notes = "Deletes the Replace Request with the given ID. After a request is created via a POST to /process-groups/{id}/replace-requests, it is expected " |
| + "that the client will properly clean up the request by DELETE'ing it, once the Replace process has completed. If the request is deleted before the request " |
| + "completes, then the Replace request will finish the step that it is currently performing and then will cancel any subsequent steps. " |
| + NON_GUARANTEED_ENDPOINT, |
| authorizations = { |
| @Authorization(value = "Only the user that submitted the request can remove it") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response deleteReplaceProcessGroupRequest( |
| @ApiParam(value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", required = false) |
| @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged, |
| @ApiParam("The ID of the Update Request") @PathParam("id") final String replaceRequestId) { |
| return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue()); |
| } |
| |
| /** |
| * Perform actual flow update of the specified flow. This is used for the initial flow update and replication updates. |
| */ |
| @Override |
| protected ProcessGroupEntity performUpdateFlow(final String groupId, final Revision revision, final ProcessGroupImportEntity requestEntity, |
| final VersionedFlowSnapshot flowSnapshot, final String idGenerationSeed, |
| final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) { |
| logger.info("Replacing Process Group with ID {} with imported Process Group with ID {}", groupId, flowSnapshot.getFlowContents().getIdentifier()); |
| |
| // Update Process Group to the new flow (including name) and update variable registry with any Variables that were added or removed |
| return serviceFacade.updateProcessGroupContents(revision, groupId, null, flowSnapshot, idGenerationSeed, verifyNotModified, |
| true, updateDescendantVersionedFlows); |
| } |
| |
| /** |
| * Create the entity that is used for update flow replication. The initial replace request entity can be re-used for the replication request. |
| */ |
| @Override |
| protected Entity createReplicateUpdateFlowEntity(final Revision revision, final ProcessGroupImportEntity requestEntity, |
| final VersionedFlowSnapshot flowSnapshot) { |
| return requestEntity; |
| } |
| |
| /** |
| * Create the entity that captures the status and result of a replace request |
| * |
| * @return a new instance of a ProcessGroupReplaceRequestEntity |
| */ |
| @Override |
| protected ProcessGroupReplaceRequestEntity createUpdateRequestEntity() { |
| return new ProcessGroupReplaceRequestEntity(); |
| } |
| |
| /** |
| * Finalize a completed update request for an existing replace request. This is used when retrieving and deleting a replace request. |
| * |
| * A completed request will contain the updated VersionedFlowSnapshot |
| * |
| * @param requestEntity the request entity to finalize |
| */ |
| @Override |
| protected void finalizeCompletedUpdateRequest(final ProcessGroupReplaceRequestEntity requestEntity) { |
| final ProcessGroupReplaceRequestDTO updateRequestDto = requestEntity.getRequest(); |
| if (updateRequestDto.isComplete()) { |
| final VersionedFlowSnapshot versionedFlowSnapshot = |
| serviceFacade.getCurrentFlowSnapshotByGroupId(updateRequestDto.getProcessGroupId()); |
| requestEntity.setVersionedFlowSnapshot(versionedFlowSnapshot); |
| } |
| } |
| |
| private static class UpdateVariableRegistryRequestWrapper extends Entity { |
| private final Set<AffectedComponentEntity> allAffectedComponents; |
| private final List<AffectedComponentDTO> activeAffectedProcessors; |
| private final List<AffectedComponentDTO> activeAffectedServices; |
| private final VariableRegistryEntity variableRegistryEntity; |
| |
| public UpdateVariableRegistryRequestWrapper(final Set<AffectedComponentEntity> allAffectedComponents, final List<AffectedComponentDTO> activeAffectedProcessors, |
| final List<AffectedComponentDTO> activeAffectedServices, VariableRegistryEntity variableRegistryEntity) { |
| |
| this.allAffectedComponents = allAffectedComponents; |
| this.activeAffectedProcessors = activeAffectedProcessors; |
| this.activeAffectedServices = activeAffectedServices; |
| this.variableRegistryEntity = variableRegistryEntity; |
| } |
| |
| public Set<AffectedComponentEntity> getAllAffectedComponents() { |
| return allAffectedComponents; |
| } |
| |
| public List<AffectedComponentDTO> getActiveAffectedProcessors() { |
| return activeAffectedProcessors; |
| } |
| |
| public List<AffectedComponentDTO> getActiveAffectedServices() { |
| return activeAffectedServices; |
| } |
| |
| public VariableRegistryEntity getVariableRegistryEntity() { |
| return variableRegistryEntity; |
| } |
| } |
| |
| /** |
| * Creates a new ProcessGroupEntity with the specified VersionedFlowSnapshot. |
| * |
| * @param groupId the group id string |
| * @param groupName the process group name string |
| * @param positionDTO the process group PositionDTO |
| * @param deserializedSnapshot the deserialized snapshot |
| * |
| * @return a new ProcessGroupEntity |
| */ |
| private ProcessGroupEntity createProcessGroupEntity( |
| String groupId, String groupName, PositionDTO positionDTO, VersionedFlowSnapshot deserializedSnapshot) { |
| |
| final ProcessGroupEntity processGroupEntity = new ProcessGroupEntity(); |
| |
| // create a ProcessGroupDTO |
| final ProcessGroupDTO processGroupDTO = new ProcessGroupDTO(); |
| processGroupDTO.setParentGroupId(groupId); |
| processGroupDTO.setName(groupName); |
| |
| processGroupEntity.setComponent(processGroupDTO); |
| processGroupEntity.setVersionedFlowSnapshot(deserializedSnapshot); |
| |
| // set the ProcessGroupEntity position |
| processGroupEntity.getComponent().setPosition(positionDTO); |
| |
| return processGroupEntity; |
| } |
| |
| /** |
| * Authorizes access to a Parameter Context and RestrictedComponents resource. |
| * |
| * @param groupId the group id string |
| * @param processGroupEntity the ProcessGroupEntity |
| * @param lookup the lookup |
| */ |
| private void authorizeAccess(String groupId, ProcessGroupEntity processGroupEntity, AuthorizableLookup lookup) { |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); |
| processGroup.authorize(authorizer, RequestAction.WRITE, user); |
| |
| // if request specifies a Parameter Context, need to authorize that user has READ policy for the Parameter Context. |
| final ParameterContextReferenceEntity referencedParamContext = processGroupEntity.getComponent().getParameterContext(); |
| if (referencedParamContext != null && referencedParamContext.getId() != null) { |
| lookup.getParameterContext(referencedParamContext.getId()).authorize(authorizer, RequestAction.READ, user); |
| } |
| |
| // if any of the components is a Restricted Component, then we must authorize the user |
| // for write access to the RestrictedComponents resource |
| final VersionedFlowSnapshot versionedFlowSnapshot = processGroupEntity.getVersionedFlowSnapshot(); |
| if (versionedFlowSnapshot != null) { |
| final Set<ConfigurableComponent> restrictedComponents = FlowRegistryUtils.getRestrictedComponents(versionedFlowSnapshot.getFlowContents(), serviceFacade); |
| restrictedComponents.forEach(restrictedComponent -> { |
| final ComponentAuthorizable restrictedComponentAuthorizable = lookup.getConfigurableComponent(restrictedComponent); |
| authorizeRestrictions(authorizer, restrictedComponentAuthorizable); |
| }); |
| |
| final Map<String, VersionedParameterContext> parameterContexts = versionedFlowSnapshot.getParameterContexts(); |
| if (parameterContexts != null) { |
| parameterContexts.values().forEach(context -> AuthorizeParameterReference.authorizeParameterContextAddition(context, serviceFacade, authorizer, lookup, user)); |
| } |
| } |
| } |
| |
| // setters |
| |
| public void setProcessorResource(ProcessorResource processorResource) { |
| this.processorResource = processorResource; |
| } |
| |
| public void setInputPortResource(InputPortResource inputPortResource) { |
| this.inputPortResource = inputPortResource; |
| } |
| |
| public void setOutputPortResource(OutputPortResource outputPortResource) { |
| this.outputPortResource = outputPortResource; |
| } |
| |
| public void setFunnelResource(FunnelResource funnelResource) { |
| this.funnelResource = funnelResource; |
| } |
| |
| public void setLabelResource(LabelResource labelResource) { |
| this.labelResource = labelResource; |
| } |
| |
| public void setRemoteProcessGroupResource(RemoteProcessGroupResource remoteProcessGroupResource) { |
| this.remoteProcessGroupResource = remoteProcessGroupResource; |
| } |
| |
| public void setConnectionResource(ConnectionResource connectionResource) { |
| this.connectionResource = connectionResource; |
| } |
| |
| public void setTemplateResource(TemplateResource templateResource) { |
| this.templateResource = templateResource; |
| } |
| |
| public void setControllerServiceResource(ControllerServiceResource controllerServiceResource) { |
| this.controllerServiceResource = controllerServiceResource; |
| } |
| |
| private static class DropEntity extends Entity { |
| final String entityId; |
| final String dropRequestId; |
| |
| public DropEntity(String entityId, String dropRequestId) { |
| this.entityId = entityId; |
| this.dropRequestId = dropRequestId; |
| } |
| |
| public String getEntityId() { |
| return entityId; |
| } |
| |
| public String getDropRequestId() { |
| return dropRequestId; |
| } |
| } |
| } |