| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.web.api; |
| |
| import io.swagger.annotations.Api; |
| import io.swagger.annotations.ApiOperation; |
| import io.swagger.annotations.ApiParam; |
| import io.swagger.annotations.ApiResponse; |
| import io.swagger.annotations.ApiResponses; |
| import io.swagger.annotations.Authorization; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.nifi.authorization.AuthorizableLookup; |
| import org.apache.nifi.authorization.Authorizer; |
| import org.apache.nifi.authorization.RequestAction; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.authorization.user.NiFiUser; |
| import org.apache.nifi.authorization.user.NiFiUserUtils; |
| import org.apache.nifi.cluster.manager.NodeResponse; |
| import org.apache.nifi.controller.ScheduledState; |
| import org.apache.nifi.controller.service.ControllerServiceState; |
| import org.apache.nifi.web.NiFiServiceFacade; |
| import org.apache.nifi.web.ResourceNotFoundException; |
| import org.apache.nifi.web.ResumeFlowException; |
| import org.apache.nifi.web.Revision; |
| import org.apache.nifi.web.api.concurrent.AsyncRequestManager; |
| import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest; |
| import org.apache.nifi.web.api.concurrent.RequestManager; |
| import org.apache.nifi.web.api.concurrent.StandardAsynchronousWebRequest; |
| import org.apache.nifi.web.api.concurrent.StandardUpdateStep; |
| import org.apache.nifi.web.api.concurrent.UpdateStep; |
| import org.apache.nifi.web.api.dto.AffectedComponentDTO; |
| import org.apache.nifi.web.api.dto.DtoFactory; |
| import org.apache.nifi.web.api.dto.ParameterContextDTO; |
| import org.apache.nifi.web.api.dto.ParameterContextUpdateRequestDTO; |
| import org.apache.nifi.web.api.dto.ParameterContextUpdateStepDTO; |
| import org.apache.nifi.web.api.dto.ParameterContextValidationRequestDTO; |
| import org.apache.nifi.web.api.dto.ParameterDTO; |
| import org.apache.nifi.web.api.dto.RevisionDTO; |
| import org.apache.nifi.web.api.entity.AffectedComponentEntity; |
| import org.apache.nifi.web.api.entity.ComponentValidationResultEntity; |
| import org.apache.nifi.web.api.entity.ComponentValidationResultsEntity; |
| import org.apache.nifi.web.api.entity.Entity; |
| import org.apache.nifi.web.api.entity.ParameterContextEntity; |
| import org.apache.nifi.web.api.entity.ParameterContextUpdateRequestEntity; |
| import org.apache.nifi.web.api.entity.ParameterContextValidationRequestEntity; |
| import org.apache.nifi.web.api.entity.ParameterEntity; |
| import org.apache.nifi.web.api.entity.ProcessGroupEntity; |
| import org.apache.nifi.web.api.request.ClientIdParameter; |
| import org.apache.nifi.web.api.request.LongParameter; |
| import org.apache.nifi.web.util.AffectedComponentUtils; |
| import org.apache.nifi.web.util.CancellableTimedPause; |
| import org.apache.nifi.web.util.ComponentLifecycle; |
| import org.apache.nifi.web.util.InvalidComponentAction; |
| import org.apache.nifi.web.util.LifecycleManagementException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.ws.rs.Consumes; |
| import javax.ws.rs.DELETE; |
| import javax.ws.rs.DefaultValue; |
| import javax.ws.rs.GET; |
| import javax.ws.rs.HttpMethod; |
| import javax.ws.rs.POST; |
| import javax.ws.rs.PUT; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.PathParam; |
| import javax.ws.rs.Produces; |
| import javax.ws.rs.QueryParam; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| |
| |
| @Path("/parameter-contexts") |
| @Api(value = "/parameter-contexts", description = "Endpoint for managing version control for a flow") |
| public class ParameterContextResource extends ApplicationResource { |
| private static final Logger logger = LoggerFactory.getLogger(ParameterContextResource.class); |
| private static final Pattern VALID_PARAMETER_NAME_PATTERN = Pattern.compile("[A-Za-z0-9 ._\\-]+"); |
| |
| private NiFiServiceFacade serviceFacade; |
| private Authorizer authorizer; |
| private DtoFactory dtoFactory; |
| private ComponentLifecycle clusterComponentLifecycle; |
| private ComponentLifecycle localComponentLifecycle; |
| |
| private RequestManager<ParameterContextEntity, ParameterContextEntity> updateRequestManager = new AsyncRequestManager<>(100, TimeUnit.MINUTES.toMillis(1L), "Parameter Context Update Thread"); |
| private RequestManager<ParameterContextValidationRequestEntity, ComponentValidationResultsEntity> validationRequestManager = new AsyncRequestManager<>(100, TimeUnit.MINUTES.toMillis(1L), |
| "Parameter Context Validation Thread"); |
| |
| |
| |
| private void authorizeReadParameterContext(final String parameterContextId) { |
| if (parameterContextId == null) { |
| throw new IllegalArgumentException("Parameter Context ID must be specified"); |
| } |
| |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable parameterContext = lookup.getParameterContext(parameterContextId); |
| parameterContext.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| }); |
| } |
| |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}") |
| @ApiOperation( |
| value = "Returns the Parameter Context with the given ID", |
| response = ParameterContextEntity.class, |
| notes = "Returns the Parameter Context with the given ID.", |
| authorizations = { |
| @Authorization(value = "Read - /parameter-contexts/{id}") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response getParameterContext(@ApiParam("The ID of the Parameter Context") @PathParam("id") final String parameterContextId) { |
| // authorize access |
| authorizeReadParameterContext(parameterContextId); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.GET); |
| } |
| |
| // get the specified parameter context |
| final ParameterContextEntity entity = serviceFacade.getParameterContext(parameterContextId, NiFiUserUtils.getNiFiUser()); |
| entity.setUri(generateResourceUri("parameter-contexts", entity.getId())); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @ApiOperation( |
| value = "Create a Parameter Context", |
| response = ParameterContextEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /parameter-contexts") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response createParameterContext( |
| @ApiParam(value = "The Parameter Context.", required = true) final ParameterContextEntity requestEntity) { |
| |
| if (requestEntity == null || requestEntity.getComponent() == null) { |
| throw new IllegalArgumentException("Parameter Context must be specified"); |
| } |
| |
| if (requestEntity.getRevision() == null || (requestEntity.getRevision().getVersion() == null || requestEntity.getRevision().getVersion() != 0)) { |
| throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Parameter Context."); |
| } |
| |
| final ParameterContextDTO context = requestEntity.getComponent(); |
| if (context.getName() == null) { |
| throw new IllegalArgumentException("Parameter Context's Name must be specified"); |
| } |
| |
| validateParameterNames(requestEntity.getComponent()); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.POST, requestEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestEntity, |
| lookup -> { |
| final Authorizable parameterContexts = lookup.getParameterContexts(); |
| parameterContexts.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| () -> serviceFacade.verifyCreateParameterContext(requestEntity.getComponent()), |
| entity -> { |
| final String contextId = generateUuid(); |
| entity.getComponent().setId(contextId); |
| |
| final Revision revision = getRevision(entity.getRevision(), contextId); |
| final ParameterContextEntity contextEntity = serviceFacade.createParameterContext(revision, entity.getComponent()); |
| |
| // generate a 201 created response |
| final String uri = generateResourceUri("parameter-contexts", contextEntity.getId()); |
| contextEntity.setUri(uri); |
| return generateCreatedResponse(URI.create(uri), contextEntity).build(); |
| }); |
| } |
| |
| |
| @PUT |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}") |
| @ApiOperation( |
| value = "Modifies a Parameter Context", |
| response = ParameterContextEntity.class, |
| notes = "This endpoint will update a Parameter Context to match the provided entity. However, this request will fail if any component is running and is referencing a Parameter in the " + |
| "Parameter Context. Generally, this endpoint is not called directly. Instead, an update request should be submitted by making a POST to the " + |
| "/parameter-contexts/update-requests endpoint. That endpoint will, in turn, call this endpoint.", |
| authorizations = { |
| @Authorization(value = "Read - /parameter-contexts/{id}"), |
| @Authorization(value = "Write - /parameter-contexts/{id}") |
| } |
| ) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response updateParameterContext( |
| @PathParam("id") String contextId, |
| @ApiParam(value = "The updated Parameter Context", required = true) ParameterContextEntity requestEntity) { |
| |
| // Validate request |
| if (requestEntity.getId() == null) { |
| throw new IllegalArgumentException("The ID of the Parameter Context must be specified"); |
| } |
| if (!requestEntity.getId().equals(contextId)) { |
| throw new IllegalArgumentException("The ID of the Parameter Context must match the ID specified in the URL's path"); |
| } |
| |
| final ParameterContextDTO updateDto = requestEntity.getComponent(); |
| if (updateDto == null) { |
| throw new IllegalArgumentException("The Parameter Context must be supplied"); |
| } |
| |
| final RevisionDTO revisionDto = requestEntity.getRevision(); |
| if (revisionDto == null) { |
| throw new IllegalArgumentException("The Revision of the Parameter Context must be specified."); |
| } |
| |
| // Perform the request |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.PUT, requestEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| final Revision requestRevision = getRevision(requestEntity.getRevision(), updateDto.getId()); |
| return withWriteLock( |
| serviceFacade, |
| requestEntity, |
| requestRevision, |
| lookup -> { |
| final Authorizable parameterContext = lookup.getParameterContext(contextId); |
| parameterContext.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| parameterContext.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); |
| }, |
| () -> serviceFacade.verifyUpdateParameterContext(updateDto, true), |
| (rev, entity) -> { |
| final ParameterContextEntity updatedEntity = serviceFacade.updateParameterContext(rev, entity.getComponent()); |
| |
| updatedEntity.setUri(generateResourceUri("parameter-contexts", entity.getId())); |
| return generateOkResponse(updatedEntity).build(); |
| } |
| ); |
| } |
| |
| |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{contextId}/update-requests") |
| @ApiOperation( |
| value = "Initiate the Update Request of a Parameter Context", |
| response = ParameterContextUpdateRequestEntity.class, |
| notes = "This will initiate the process of updating a Parameter Context. Changing the value of a Parameter may require that one or more components be stopped and " + |
| "restarted, so this acttion may take significantly more time than many other REST API actions. As a result, this endpoint will immediately return a ParameterContextUpdateRequestEntity, " + |
| "and the process of updating the necessary components will occur asynchronously in the background. The client may then periodically poll the status of the request by " + |
| "issuing a GET request to /parameter-contexts/update-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to " + |
| "/parameter-contexts/update-requests/{requestId}.", |
| authorizations = { |
| @Authorization(value = "Read - /parameter-contexts/{parameterContextId}"), |
| @Authorization(value = "Write - /parameter-contexts/{parameterContextId}"), |
| @Authorization(value = "Read - for every component that is affected by the update"), |
| @Authorization(value = "Write - for every component that is affected by the update") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response submitParameterContextUpdate( |
| @PathParam("contextId") final String contextId, |
| @ApiParam(value = "The updated version of the parameter context.", required = true) final ParameterContextEntity requestEntity) { |
| |
| if (requestEntity == null) { |
| throw new IllegalArgumentException("Parameter Context must be specified."); |
| } |
| |
| // Verify the request |
| final RevisionDTO revisionDto = requestEntity.getRevision(); |
| if (revisionDto == null) { |
| throw new IllegalArgumentException("Parameter Context Revision must be specified"); |
| } |
| |
| final ParameterContextDTO contextDto = requestEntity.getComponent(); |
| if (contextDto == null) { |
| throw new IllegalArgumentException("Parameter Context must be specified"); |
| } |
| |
| if (contextDto.getId() == null) { |
| throw new IllegalArgumentException("Parameter Context's ID must be specified"); |
| } |
| if (!contextDto.getId().equals(contextId)) { |
| throw new IllegalArgumentException("ID of Parameter Context in message body does not match Parameter Context ID supplied in URI"); |
| } |
| |
| validateParameterNames(contextDto); |
| |
| // We will perform the updating of the Parameter Context in a background thread because it can be a long-running process. |
| // In order to do this, we will need some objects that are only available as Thread-Local variables to the current |
| // thread, so we will gather the values for these objects up front. |
| final boolean replicateRequest = isReplicateRequest(); |
| final ComponentLifecycle componentLifecycle = replicateRequest ? clusterComponentLifecycle : localComponentLifecycle; |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| // Workflow for this process: |
| // 1. Determine which components will be affected and are enabled/running |
| // 2. Verify READ and WRITE permissions for user, for every component that is affected |
| // 3. Verify READ and WRITE permissions for user, for Parameter Context |
| // 4. Stop all Processors that are affected. |
| // 5. Wait for all of the Processors to finish stopping. |
| // 6. Disable all Controller Services that are affected. |
| // 7. Wait for all Controller Services to finish disabling. |
| // 8. Update Parameter Context |
| // 9. Re-Enable all affected Controller Services |
| // 10. Re-Start all Processors |
| |
| final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByParameterContextUpdate(contextDto); |
| logger.debug("Received Update Request for Parameter Context: {}; the following {} components will be affected: {}", requestEntity, affectedComponents.size(), affectedComponents); |
| |
| final InitiateChangeParameterContextRequestWrapper requestWrapper = new InitiateChangeParameterContextRequestWrapper(requestEntity, componentLifecycle, getAbsolutePath(), |
| affectedComponents, replicateRequest, user); |
| |
| final Revision requestRevision = getRevision(requestEntity.getRevision(), contextDto.getId()); |
| return withWriteLock( |
| serviceFacade, |
| requestWrapper, |
| requestRevision, |
| lookup -> { |
| // Verify READ and WRITE permissions for user, for the Parameter Context itself |
| final Authorizable parameterContext = lookup.getParameterContext(contextId); |
| parameterContext.authorize(authorizer, RequestAction.READ, user); |
| parameterContext.authorize(authorizer, RequestAction.WRITE, user); |
| |
| // Verify READ and WRITE permissions for user, for every component that is affected |
| affectedComponents.forEach(component -> authorizeAffectedComponent(component, lookup, user, true, true)); |
| }, |
| () -> { |
| // Verify Request |
| serviceFacade.verifyUpdateParameterContext(contextDto, false); |
| }, |
| this::submitUpdateRequest |
| ); |
| } |
| |
| private void validateParameterNames(final ParameterContextDTO parameterContextDto) { |
| for (final ParameterEntity entity : parameterContextDto.getParameters()) { |
| final String parameterName = entity.getParameter().getName(); |
| if (!isLegalParameterName(parameterName)) { |
| throw new IllegalArgumentException("Request contains an illegal Parameter Name (" + parameterName + "). Parameter names may only include letters, numbers, spaces, and the special " + |
| "characters .-_"); |
| } |
| } |
| } |
| |
| private boolean isLegalParameterName(final String parameterName) { |
| return VALID_PARAMETER_NAME_PATTERN.matcher(parameterName).matches(); |
| } |
| |
| private void authorizeAffectedComponent(final AffectedComponentEntity entity, final AuthorizableLookup lookup, final NiFiUser user, final boolean requireRead, final boolean requireWrite) { |
| final AffectedComponentDTO dto = entity.getComponent(); |
| if (dto == null) { |
| // If the DTO is null, it is an indication that the user does not have permissions. |
| // However, we don't want to just throw an AccessDeniedException because we would rather |
| // ensure that all of the appropriate actions are taken by the pluggable Authorizer. As a result, |
| // we attempt to find the component as a Processor and fall back to finding it as a Controller Service. |
| // We then go ahead and attempt the authorization, expecting it to fail. |
| Authorizable authorizable; |
| try { |
| authorizable = lookup.getProcessor(entity.getId()).getAuthorizable(); |
| } catch (final ResourceNotFoundException rnfe) { |
| authorizable = lookup.getControllerService(entity.getId()).getAuthorizable(); |
| } |
| |
| if (requireRead) { |
| authorizable.authorize(authorizer, RequestAction.READ, user); |
| } |
| if (requireWrite) { |
| authorizable.authorize(authorizer, RequestAction.WRITE, user); |
| } |
| } else if (AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR.equals(dto.getReferenceType())) { |
| final Authorizable processor = lookup.getProcessor(dto.getId()).getAuthorizable(); |
| |
| if (requireRead) { |
| processor.authorize(authorizer, RequestAction.READ, user); |
| } |
| if (requireWrite) { |
| processor.authorize(authorizer, RequestAction.WRITE, user); |
| } |
| } else if (AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getReferenceType())) { |
| final Authorizable service = lookup.getControllerService(dto.getId()).getAuthorizable(); |
| |
| if (requireRead) { |
| service.authorize(authorizer, RequestAction.READ, user); |
| } |
| if (requireWrite) { |
| service.authorize(authorizer, RequestAction.WRITE, user); |
| } |
| } |
| } |
| |
| |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{contextId}/update-requests/{requestId}") |
| @ApiOperation( |
| value = "Returns the Update Request with the given ID", |
| response = ParameterContextUpdateRequestEntity.class, |
| notes = "Returns the Update Request with the given ID. Once an Update Request has been created by performing a POST to /nifi-api/parameter-contexts, " |
| + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the " |
| + "current state of the request, and any failures. ", |
| authorizations = { |
| @Authorization(value = "Only the user that submitted the request can get it") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response getParameterContextUpdate( |
| @ApiParam("The ID of the Parameter Context") @PathParam("contextId") final String contextId, |
| @ApiParam("The ID of the Update Request") @PathParam("requestId") final String updateRequestId) { |
| |
| authorizeReadParameterContext(contextId); |
| |
| return retrieveUpdateRequest("update-requests", contextId, updateRequestId); |
| } |
| |
| |
| @DELETE |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{contextId}/update-requests/{requestId}") |
| @ApiOperation( |
| value = "Deletes the Update Request with the given ID", |
| response = ParameterContextUpdateRequestEntity.class, |
| notes = "Deletes the Update Request with the given ID. After a request is created via a POST to /nifi-api/parameter-contexts/update-requests, it is expected " |
| + "that the client will properly clean up the request by DELETE'ing it, once the Update process has completed. If the request is deleted before the request " |
| + "completes, then the Update request will finish the step that it is currently performing and then will cancel any subsequent steps.", |
| authorizations = { |
| @Authorization(value = "Only the user that submitted the request can remove it") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response deleteUpdateRequest( |
| @ApiParam( |
| value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", |
| required = false |
| ) |
| @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged, |
| @ApiParam("The ID of the ParameterContext") @PathParam("contextId") final String contextId, |
| @ApiParam("The ID of the Update Request") @PathParam("requestId") final String updateRequestId) { |
| |
| authorizeReadParameterContext(contextId); |
| return deleteUpdateRequest("update-requests", contextId, updateRequestId, disconnectedNodeAcknowledged.booleanValue()); |
| } |
| |
| |
| @DELETE |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}") |
| @ApiOperation( |
| value = "Deletes the Parameter Context with the given ID", |
| response = ParameterContextEntity.class, |
| notes = "Deletes the Parameter Context with the given ID.", |
| authorizations = { |
| @Authorization(value = "Read - /parameter-contexts/{uuid}"), |
| @Authorization(value = "Write - /parameter-contexts/{uuid}"), |
| @Authorization(value = "Read - /process-groups/{uuid}, for any Process Group that is currently bound to the Parameter Context"), |
| @Authorization(value = "Write - /process-groups/{uuid}, for any Process Group that is currently bound to the Parameter Context") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response deleteParameterContext( |
| @ApiParam( |
| value = "The version is used to verify the client is working with the latest version of the flow.", |
| required = false) |
| @QueryParam(VERSION) final LongParameter version, |
| @ApiParam( |
| value = "If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.", |
| required = false) |
| @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, |
| @ApiParam( |
| value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", |
| required = false |
| ) |
| @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged, |
| @ApiParam("The Parameter Context ID.") @PathParam("id") final String parameterContextId) { |
| |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.DELETE); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(disconnectedNodeAcknowledged); |
| } |
| |
| final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), parameterContextId); |
| return withWriteLock( |
| serviceFacade, |
| null, |
| requestRevision, |
| lookup -> { |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| final Authorizable parameterContext = lookup.getParameterContext(parameterContextId); |
| parameterContext.authorize(authorizer, RequestAction.READ, user); |
| parameterContext.authorize(authorizer, RequestAction.WRITE, user); |
| |
| final ParameterContextEntity contextEntity = serviceFacade.getParameterContext(parameterContextId, user); |
| for (final ProcessGroupEntity boundGroupEntity : contextEntity.getComponent().getBoundProcessGroups()) { |
| final String groupId = boundGroupEntity.getId(); |
| final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable(); |
| groupAuthorizable.authorize(authorizer, RequestAction.READ, user); |
| groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user); |
| } |
| }, |
| () -> serviceFacade.verifyDeleteParameterContext(parameterContextId), |
| (revision, groupEntity) -> { |
| // disconnect from version control |
| final ParameterContextEntity entity = serviceFacade.deleteParameterContext(revision, parameterContextId); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| }); |
| |
| } |
| |
| |
| @POST |
| @Consumes(MediaType.APPLICATION_JSON) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{contextId}/validation-requests") |
| @ApiOperation( |
| value = "Initiate a Validation Request to determine how the validity of components will change if a Parameter Context were to be updated", |
| response = ParameterContextValidationRequestEntity.class, |
| notes = "This will initiate the process of validating all components whose Process Group is bound to the specified Parameter Context. Performing validation against " + |
| "an arbitrary number of components may be expect and take significantly more time than many other REST API actions. As a result, this endpoint will immediately return " + |
| "a ParameterContextValidationRequestEntity, " + |
| "and the process of validating the necessary components will occur asynchronously in the background. The client may then periodically poll the status of the request by " + |
| "issuing a GET request to /parameter-contexts/validation-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to " + |
| "/parameter-contexts/validation-requests/{requestId}.", |
| authorizations = { |
| @Authorization(value = "Read - /parameter-contexts/{parameterContextId}") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response submitValidationRequest( |
| @PathParam("contextId") final String contextId, |
| @ApiParam(value = "The validation request", required=true) final ParameterContextValidationRequestEntity requestEntity) { |
| |
| if (requestEntity == null) { |
| throw new IllegalArgumentException("Parameter Context must be specified."); |
| } |
| |
| final ParameterContextValidationRequestDTO requestDto = requestEntity.getRequest(); |
| if (requestDto == null) { |
| throw new IllegalArgumentException("Parameter Context must be specified"); |
| } |
| |
| if (requestDto.getParameterContext() == null) { |
| throw new IllegalArgumentException("Parameter Context must be specified"); |
| } |
| if (requestDto.getParameterContext().getId() == null) { |
| throw new IllegalArgumentException("Parameter Context's ID must be specified"); |
| } |
| |
| if (isReplicateRequest()) { |
| return replicate("POST", requestEntity); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(requestEntity.isDisconnectedNodeAcknowledged()); |
| } |
| |
| return withWriteLock( |
| serviceFacade, |
| requestEntity, |
| lookup -> { |
| final Authorizable parameterContext = lookup.getParameterContext(contextId); |
| parameterContext.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); |
| |
| authorizeReferencingComponents(requestEntity.getRequest().getParameterContext().getId(), lookup, NiFiUserUtils.getNiFiUser()); |
| }, |
| () -> {}, |
| entity -> performAsyncValidation(entity, NiFiUserUtils.getNiFiUser()) |
| ); |
| } |
| |
| private void authorizeReferencingComponents(final String parameterContextId, final AuthorizableLookup lookup, final NiFiUser user) { |
| final ParameterContextEntity context = serviceFacade.getParameterContext(parameterContextId, NiFiUserUtils.getNiFiUser()); |
| |
| for (final ParameterEntity parameterEntity : context.getComponent().getParameters()) { |
| final ParameterDTO dto = parameterEntity.getParameter(); |
| if (dto == null) { |
| continue; |
| } |
| |
| for (final AffectedComponentEntity affectedComponent : dto.getReferencingComponents()) { |
| authorizeAffectedComponent(affectedComponent, lookup, user, true, false); |
| } |
| } |
| } |
| |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{contextId}/validation-requests/{id}") |
| @ApiOperation( |
| value = "Returns the Validation Request with the given ID", |
| response = ParameterContextValidationRequestEntity.class, |
| notes = "Returns the Validation Request with the given ID. Once a Validation Request has been created by performing a POST to /nifi-api/validation-contexts, " |
| + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the " |
| + "current state of the request, and any failures. ", |
| authorizations = { |
| @Authorization(value = "Only the user that submitted the request can get it") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response getValidationRequest( |
| @ApiParam("The ID of the Parameter Context") @PathParam("contextId") final String contextId, |
| @ApiParam("The ID of the Validation Request") @PathParam("id") final String validationRequestId) { |
| |
| authorizeReadParameterContext(contextId); |
| |
| if (isReplicateRequest()) { |
| return replicate("GET"); |
| } |
| |
| return retrieveValidationRequest("validation-requests", contextId, validationRequestId); |
| } |
| |
| @DELETE |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{contextId}/validation-requests/{id}") |
| @ApiOperation( |
| value = "Deletes the Validation Request with the given ID", |
| response = ParameterContextValidationRequestEntity.class, |
| notes = "Deletes the Validation Request with the given ID. After a request is created via a POST to /nifi-api/validation-contexts, it is expected " |
| + "that the client will properly clean up the request by DELETE'ing it, once the validation process has completed. If the request is deleted before the request " |
| + "completes, then the Validation request will finish the step that it is currently performing and then will cancel any subsequent steps.", |
| authorizations = { |
| @Authorization(value = "Only the user that submitted the request can remove it") |
| }) |
| @ApiResponses(value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| }) |
| public Response deleteValidationRequest( |
| @ApiParam( |
| value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", |
| required = false |
| ) |
| @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged, |
| @ApiParam("The ID of the Parameter Context") @PathParam("contextId") final String contextId, |
| @ApiParam("The ID of the Update Request") @PathParam("id") final String validationRequestId) { |
| |
| authorizeReadParameterContext(contextId); |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.DELETE); |
| } else if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(disconnectedNodeAcknowledged); |
| } |
| |
| return deleteValidationRequest("validation-requests", contextId, validationRequestId, disconnectedNodeAcknowledged.booleanValue()); |
| } |
| |
| |
| |
| private Response performAsyncValidation(final ParameterContextValidationRequestEntity requestEntity, final NiFiUser user) { |
| // Create an asynchronous request that will occur in the background, because this request may |
| // result in stopping components, which can take an indeterminate amount of time. |
| final String requestId = generateUuid(); |
| final AsynchronousWebRequest<ParameterContextValidationRequestEntity, ComponentValidationResultsEntity> request = new StandardAsynchronousWebRequest<>(requestId, requestEntity, null, user, |
| getValidationSteps()); |
| |
| // Submit the request to be performed in the background |
| final Consumer<AsynchronousWebRequest<ParameterContextValidationRequestEntity, ComponentValidationResultsEntity>> validationTask = asyncRequest -> { |
| try { |
| final ComponentValidationResultsEntity resultEntity = validateComponents(requestEntity, user); |
| asyncRequest.markStepComplete(resultEntity); |
| } catch (final Exception e) { |
| logger.error("Failed to validate components", e); |
| asyncRequest.fail("Failed to validation components due to " + e); |
| } |
| }; |
| |
| validationRequestManager.submitRequest("validation-requests", requestId, request, validationTask); |
| |
| // Generate the response. |
| final ParameterContextValidationRequestDTO validationRequestDto = new ParameterContextValidationRequestDTO(); |
| validationRequestDto.setComplete(request.isComplete()); |
| validationRequestDto.setFailureReason(request.getFailureReason()); |
| validationRequestDto.setLastUpdated(request.getLastUpdated()); |
| validationRequestDto.setRequestId(requestId); |
| validationRequestDto.setUri(generateResourceUri("parameter-contexts", "validation-requests", requestId)); |
| validationRequestDto.setPercentCompleted(request.getPercentComplete()); |
| validationRequestDto.setState(request.getState()); |
| validationRequestDto.setComponentValidationResults(request.getResults()); |
| |
| final ParameterContextValidationRequestEntity validationRequestEntity = new ParameterContextValidationRequestEntity(); |
| validationRequestEntity.setRequest(validationRequestDto); |
| |
| return generateOkResponse(validationRequestEntity).build(); |
| } |
| |
| private List<UpdateStep> getValidationSteps() { |
| return Collections.singletonList(new StandardUpdateStep("Validating Components")); |
| } |
| |
| private ComponentValidationResultsEntity validateComponents(final ParameterContextValidationRequestEntity requestEntity, final NiFiUser user) { |
| final List<ComponentValidationResultEntity> resultEntities = serviceFacade.validateComponents(requestEntity.getRequest().getParameterContext(), user); |
| final ComponentValidationResultsEntity resultsEntity = new ComponentValidationResultsEntity(); |
| resultsEntity.setValidationResults(resultEntities); |
| return resultsEntity; |
| } |
| |
| |
| private Response submitUpdateRequest(final Revision requestRevision, final InitiateChangeParameterContextRequestWrapper requestWrapper) { |
| // Create an asynchronous request that will occur in the background, because this request may |
| // result in stopping components, which can take an indeterminate amount of time. |
| final String requestId = UUID.randomUUID().toString(); |
| final String contextId = requestWrapper.getParameterContextEntity().getComponent().getId(); |
| final AsynchronousWebRequest<ParameterContextEntity, ParameterContextEntity> request = new StandardAsynchronousWebRequest<>(requestId, requestWrapper.getParameterContextEntity(), contextId, |
| requestWrapper.getUser(), getUpdateSteps()); |
| |
| // Submit the request to be performed in the background |
| final Consumer<AsynchronousWebRequest<ParameterContextEntity, ParameterContextEntity>> updateTask = asyncRequest -> { |
| try { |
| final ParameterContextEntity updatedParameterContextEntity = updateParameterContext(asyncRequest, requestWrapper.getComponentLifecycle(), requestWrapper.getExampleUri(), |
| requestWrapper.getReferencingComponents(), requestWrapper.isReplicateRequest(), requestRevision, requestWrapper.getParameterContextEntity()); |
| |
| asyncRequest.markStepComplete(updatedParameterContextEntity); |
| } catch (final ResumeFlowException rfe) { |
| // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow |
| // since in this case the flow was successfully updated - we just couldn't re-enable the components. |
| logger.error(rfe.getMessage(), rfe); |
| asyncRequest.fail(rfe.getMessage()); |
| } catch (final Exception e) { |
| logger.error("Failed to update Parameter Context", e); |
| asyncRequest.fail("Failed to update Parameter Context due to " + e); |
| } |
| }; |
| |
| updateRequestManager.submitRequest("update-requests", requestId, request, updateTask); |
| |
| // Generate the response. |
| final ParameterContextUpdateRequestEntity updateRequestEntity = createUpdateRequestEntity(request, "update-requests", contextId, requestId); |
| return generateOkResponse(updateRequestEntity).build(); |
| } |
| |
| private List<UpdateStep> getUpdateSteps() { |
| return Arrays.asList(new StandardUpdateStep("Stopping Affected Processors"), |
| new StandardUpdateStep("Disabling Affected Controller Services"), |
| new StandardUpdateStep("Updating Parameter Context"), |
| new StandardUpdateStep("Re-Enabling Affected Controller Services"), |
| new StandardUpdateStep("Restarting Affected Processors")); |
| } |
| |
| |
| private ParameterContextEntity updateParameterContext(final AsynchronousWebRequest<ParameterContextEntity, ParameterContextEntity> asyncRequest, final ComponentLifecycle componentLifecycle, |
| final URI uri, final Set<AffectedComponentEntity> affectedComponents, |
| final boolean replicateRequest, final Revision revision, final ParameterContextEntity updatedContextEntity) |
| throws LifecycleManagementException, ResumeFlowException { |
| |
| final Set<AffectedComponentEntity> runningProcessors = affectedComponents.stream() |
| .filter(entity -> entity.getComponent() != null) |
| .filter(entity -> AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR.equals(entity.getComponent().getReferenceType())) |
| .filter(component -> "Running".equalsIgnoreCase(component.getComponent().getState())) |
| .collect(Collectors.toSet()); |
| |
| final Set<AffectedComponentEntity> enabledControllerServices = affectedComponents.stream() |
| .filter(entity -> entity.getComponent() != null) |
| .filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType())) |
| .filter(dto -> "Enabling".equalsIgnoreCase(dto.getComponent().getState()) || "Enabled".equalsIgnoreCase(dto.getComponent().getState())) |
| .collect(Collectors.toSet()); |
| |
| stopProcessors(runningProcessors, asyncRequest, componentLifecycle, uri); |
| if (asyncRequest.isCancelled()) { |
| return null; |
| } |
| |
| disableControllerServices(enabledControllerServices, asyncRequest, componentLifecycle, uri); |
| if (asyncRequest.isCancelled()) { |
| return null; |
| } |
| |
| asyncRequest.markStepComplete(); |
| logger.info("Updating Parameter Context with ID {}", updatedContextEntity.getId()); |
| |
| final ParameterContextEntity updatedEntity; |
| try { |
| updatedEntity = performParameterContextUpdate(asyncRequest, uri, replicateRequest, revision, updatedContextEntity); |
| asyncRequest.markStepComplete(); |
| logger.info("Successfully updated Parameter Context with ID {}", updatedContextEntity.getId()); |
| } finally { |
| // TODO: can almost certainly be refactored so that the same code is shared between VersionsResource and ParameterContextResource. |
| if (!asyncRequest.isCancelled()) { |
| enableControllerServices(enabledControllerServices, asyncRequest, componentLifecycle, uri); |
| } |
| |
| if (!asyncRequest.isCancelled()) { |
| restartProcessors(runningProcessors, asyncRequest, componentLifecycle, uri); |
| } |
| } |
| |
| asyncRequest.setCancelCallback(null); |
| if (asyncRequest.isCancelled()) { |
| return null; |
| } |
| |
| return updatedEntity; |
| } |
| |
| private ParameterContextEntity performParameterContextUpdate(final AsynchronousWebRequest<?, ?> asyncRequest, final URI exampleUri, final boolean replicateRequest, final Revision revision, |
| final ParameterContextEntity updatedContext) throws LifecycleManagementException { |
| |
| if (replicateRequest) { |
| final URI updateUri; |
| try { |
| updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(), |
| exampleUri.getPort(), "/nifi-api/parameter-contexts/" + updatedContext.getId(), null, exampleUri.getFragment()); |
| } catch (URISyntaxException e) { |
| throw new RuntimeException(e); |
| } |
| |
| final Map<String, String> headers = new HashMap<>(); |
| headers.put("content-type", MediaType.APPLICATION_JSON); |
| |
| final NiFiUser user = asyncRequest.getUser(); |
| final NodeResponse clusterResponse; |
| try { |
| logger.debug("Replicating PUT request to {} for user {}", updateUri, user); |
| |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, updatedContext, headers).awaitMergedResponse(); |
| } else { |
| clusterResponse = getRequestReplicator().forwardToCoordinator( |
| getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, updatedContext, headers).awaitMergedResponse(); |
| } |
| } catch (final InterruptedException ie) { |
| logger.warn("Interrupted while replicating PUT request to {} for user {}", updateUri, user); |
| Thread.currentThread().interrupt(); |
| throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie); |
| } |
| |
| final int updateFlowStatus = clusterResponse.getStatus(); |
| if (updateFlowStatus != Response.Status.OK.getStatusCode()) { |
| final String explanation = getResponseEntity(clusterResponse, String.class); |
| logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}", |
| updateUri, user, updateFlowStatus, explanation); |
| throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation); |
| } |
| |
| return serviceFacade.getParameterContext(updatedContext.getId(), user); |
| } else { |
| serviceFacade.verifyUpdateParameterContext(updatedContext.getComponent(), true); |
| return serviceFacade.updateParameterContext(revision, updatedContext.getComponent()); |
| } |
| } |
| |
| /** |
| * Extracts the response entity from the specified node response. |
| * |
| * @param nodeResponse node response |
| * @param clazz class |
| * @param <T> type of class |
| * @return the response entity |
| */ |
| @SuppressWarnings("unchecked") |
| private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) { |
| T entity = (T) nodeResponse.getUpdatedEntity(); |
| if (entity == null) { |
| entity = nodeResponse.getClientResponse().readEntity(clazz); |
| } |
| return entity; |
| } |
| |
| |
| private void stopProcessors(final Set<AffectedComponentEntity> processors, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle, final URI uri) |
| throws LifecycleManagementException { |
| |
| logger.info("Stopping {} Processors in order to update Parameter Context", processors.size()); |
| final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); |
| asyncRequest.setCancelCallback(stopComponentsPause::cancel); |
| componentLifecycle.scheduleComponents(uri, "root", processors, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.WAIT); |
| } |
| |
| private void restartProcessors(final Set<AffectedComponentEntity> processors, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle, final URI uri) |
| throws ResumeFlowException, LifecycleManagementException { |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Restarting {} Processors after having updated Parameter Context: {}", processors.size(), processors); |
| } else { |
| logger.info("Restarting {} Processors after having updated Parameter Context", processors.size()); |
| } |
| |
| // Step 14. Restart all components |
| final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(processors); |
| |
| final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); |
| asyncRequest.setCancelCallback(startComponentsPause::cancel); |
| |
| try { |
| componentLifecycle.scheduleComponents(uri, "root", componentsToStart, ScheduledState.RUNNING, startComponentsPause, InvalidComponentAction.SKIP); |
| asyncRequest.markStepComplete(); |
| } catch (final IllegalStateException ise) { |
| // Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide |
| // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated. |
| throw new ResumeFlowException("Failed to restart components because " + ise.getMessage(), ise); |
| } |
| } |
| |
| private void disableControllerServices(final Set<AffectedComponentEntity> controllerServices, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle, |
| final URI uri) throws LifecycleManagementException { |
| |
| asyncRequest.markStepComplete(); |
| logger.info("Disabling {} Controller Services in order to update Parameter Context", controllerServices.size()); |
| final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); |
| asyncRequest.setCancelCallback(disableServicesPause::cancel); |
| componentLifecycle.activateControllerServices(uri, "root", controllerServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.WAIT); |
| } |
| |
| private void enableControllerServices(final Set<AffectedComponentEntity> controllerServices, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle, |
| final URI uri) throws LifecycleManagementException, ResumeFlowException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Re-Enabling {} Controller Services: {}", controllerServices.size(), controllerServices); |
| } else { |
| logger.info("Re-Enabling {} Controller Services after having updated Parameter Context", controllerServices.size()); |
| } |
| |
| // Step 13. Re-enable all disabled controller services |
| final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); |
| asyncRequest.setCancelCallback(enableServicesPause::cancel); |
| final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(controllerServices); |
| |
| try { |
| componentLifecycle.activateControllerServices(uri, "root", servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP); |
| asyncRequest.markStepComplete(); |
| } catch (final IllegalStateException ise) { |
| // Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide |
| // a more intelligent error message as to exactly what happened, rather than indicate that the Parameter Context could not be updated. |
| throw new ResumeFlowException("Failed to re-enable Controller Services because " + ise.getMessage(), ise); |
| } |
| } |
| |
| private Set<AffectedComponentEntity> getUpdatedEntities(final Set<AffectedComponentEntity> originalEntities) { |
| final Set<AffectedComponentEntity> entities = new LinkedHashSet<>(); |
| |
| for (final AffectedComponentEntity original : originalEntities) { |
| try { |
| final AffectedComponentEntity updatedEntity = AffectedComponentUtils.updateEntity(original, serviceFacade, dtoFactory); |
| if (updatedEntity != null) { |
| entities.add(updatedEntity); |
| } |
| } catch (final ResourceNotFoundException rnfe) { |
| // Component was removed. Just continue on without adding anything to the entities. |
| // We do this because the intent is to get updated versions of the entities with current |
| // Revisions so that we can change the states of the components. If the component was removed, |
| // then we can just drop the entity, since there is no need to change its state. |
| } |
| } |
| |
| return entities; |
| } |
| |
| |
| private Response retrieveValidationRequest(final String requestType, final String contextId, final String requestId) { |
| if (requestId == null) { |
| throw new IllegalArgumentException("Request ID must be specified."); |
| } |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| final AsynchronousWebRequest<?, ComponentValidationResultsEntity> asyncRequest = validationRequestManager.getRequest(requestType, requestId, user); |
| final ParameterContextValidationRequestEntity requestEntity = createValidationRequestEntity(asyncRequest, contextId, requestType, requestId); |
| return generateOkResponse(requestEntity).build(); |
| } |
| |
| private Response deleteValidationRequest(final String requestType, final String contextId, final String requestId, final boolean disconnectedNodeAcknowledged) { |
| if (requestId == null) { |
| throw new IllegalArgumentException("Request ID must be specified."); |
| } |
| |
| if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(disconnectedNodeAcknowledged); |
| } |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| // request manager will ensure that the current is the user that submitted this request |
| final AsynchronousWebRequest<?, ComponentValidationResultsEntity> asyncRequest = validationRequestManager.removeRequest(requestType, requestId, user); |
| if (asyncRequest == null) { |
| throw new ResourceNotFoundException("Could not find request of type " + requestType + " with ID " + requestId); |
| } |
| |
| if (!asyncRequest.isComplete()) { |
| asyncRequest.cancel(); |
| } |
| |
| final ParameterContextValidationRequestEntity requestEntity = createValidationRequestEntity(asyncRequest, contextId, requestType, requestId); |
| return generateOkResponse(requestEntity).build(); |
| } |
| |
| private ParameterContextValidationRequestEntity createValidationRequestEntity(final AsynchronousWebRequest<?, ComponentValidationResultsEntity> asyncRequest, final String requestType, |
| final String contextId, final String requestId) { |
| final ParameterContextValidationRequestDTO requestDto = new ParameterContextValidationRequestDTO(); |
| |
| requestDto.setComplete(asyncRequest.isComplete()); |
| requestDto.setFailureReason(asyncRequest.getFailureReason()); |
| requestDto.setLastUpdated(asyncRequest.getLastUpdated()); |
| requestDto.setRequestId(requestId); |
| requestDto.setUri(generateResourceUri("parameter-contexts", contextId, requestType, requestId)); |
| requestDto.setState(asyncRequest.getState()); |
| requestDto.setPercentCompleted(asyncRequest.getPercentComplete()); |
| requestDto.setComponentValidationResults(asyncRequest.getResults()); |
| |
| final ParameterContextValidationRequestEntity entity = new ParameterContextValidationRequestEntity(); |
| entity.setRequest(requestDto); |
| return entity; |
| } |
| |
| private Response retrieveUpdateRequest(final String requestType, final String contextId, final String requestId) { |
| if (requestId == null) { |
| throw new IllegalArgumentException("Request ID must be specified."); |
| } |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| // request manager will ensure that the current is the user that submitted this request |
| final AsynchronousWebRequest<ParameterContextEntity, ParameterContextEntity> asyncRequest = updateRequestManager.getRequest(requestType, requestId, user); |
| final ParameterContextUpdateRequestEntity updateRequestEntity = createUpdateRequestEntity(asyncRequest, requestType, contextId, requestId); |
| return generateOkResponse(updateRequestEntity).build(); |
| } |
| |
| private Response deleteUpdateRequest(final String requestType, final String contextId, final String requestId, final boolean disconnectedNodeAcknowledged) { |
| if (requestId == null) { |
| throw new IllegalArgumentException("Request ID must be specified."); |
| } |
| |
| if (isDisconnectedFromCluster()) { |
| verifyDisconnectedNodeModification(disconnectedNodeAcknowledged); |
| } |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| // request manager will ensure that the current is the user that submitted this request |
| final AsynchronousWebRequest<ParameterContextEntity, ParameterContextEntity> asyncRequest = updateRequestManager.removeRequest(requestType, requestId, user); |
| if (asyncRequest == null) { |
| throw new ResourceNotFoundException("Could not find request of type " + requestType + " with ID " + requestId); |
| } |
| |
| if (!asyncRequest.isComplete()) { |
| asyncRequest.cancel(); |
| } |
| |
| final ParameterContextUpdateRequestEntity updateRequestEntity = createUpdateRequestEntity(asyncRequest, requestType, contextId, requestId); |
| return generateOkResponse(updateRequestEntity).build(); |
| } |
| |
| private ParameterContextUpdateRequestEntity createUpdateRequestEntity(final AsynchronousWebRequest<ParameterContextEntity, ParameterContextEntity> asyncRequest, final String requestType, |
| final String contextId, final String requestId) { |
| final ParameterContextUpdateRequestDTO updateRequestDto = new ParameterContextUpdateRequestDTO(); |
| updateRequestDto.setComplete(asyncRequest.isComplete()); |
| updateRequestDto.setFailureReason(asyncRequest.getFailureReason()); |
| updateRequestDto.setLastUpdated(asyncRequest.getLastUpdated()); |
| updateRequestDto.setRequestId(requestId); |
| updateRequestDto.setUri(generateResourceUri("parameter-contexts", contextId, requestType, requestId)); |
| updateRequestDto.setState(asyncRequest.getState()); |
| updateRequestDto.setPercentCompleted(asyncRequest.getPercentComplete()); |
| |
| final List<ParameterContextUpdateStepDTO> updateSteps = new ArrayList<>(); |
| for (final UpdateStep updateStep : asyncRequest.getUpdateSteps()) { |
| final ParameterContextUpdateStepDTO stepDto = new ParameterContextUpdateStepDTO(); |
| stepDto.setDescription(updateStep.getDescription()); |
| stepDto.setComplete(updateStep.isComplete()); |
| stepDto.setFailureReason(updateStep.getFailureReason()); |
| updateSteps.add(stepDto); |
| } |
| updateRequestDto.setUpdateSteps(updateSteps); |
| |
| final ParameterContextEntity initialRequest = asyncRequest.getRequest(); |
| |
| // The AffectedComponentEntity itself does not evaluate equality based on component information. As a result, we want to de-dupe the entities based on their identifiers. |
| final Map<String, AffectedComponentEntity> affectedComponents = new HashMap<>(); |
| for (final ParameterEntity entity : initialRequest.getComponent().getParameters()) { |
| for (final AffectedComponentEntity affectedComponentEntity : entity.getParameter().getReferencingComponents()) { |
| affectedComponents.put(affectedComponentEntity.getId(), serviceFacade.getUpdatedAffectedComponentEntity(affectedComponentEntity)); |
| } |
| } |
| updateRequestDto.setReferencingComponents(new HashSet<>(affectedComponents.values())); |
| |
| // Populate the Affected Components |
| final ParameterContextEntity contextEntity = serviceFacade.getParameterContext(asyncRequest.getComponentId(), NiFiUserUtils.getNiFiUser()); |
| final ParameterContextUpdateRequestEntity updateRequestEntity = new ParameterContextUpdateRequestEntity(); |
| |
| // If the request is complete, include the new representation of the Parameter Context along with its new Revision. Otherwise, do not include the information, since it is 'stale' |
| if (updateRequestDto.isComplete()) { |
| updateRequestDto.setParameterContext(contextEntity == null ? null : contextEntity.getComponent()); |
| updateRequestEntity.setParameterContextRevision(contextEntity == null ? null : contextEntity.getRevision()); |
| } |
| |
| updateRequestEntity.setRequest(updateRequestDto); |
| return updateRequestEntity; |
| } |
| |
| |
| private static class InitiateChangeParameterContextRequestWrapper extends Entity { |
| private final ParameterContextEntity parameterContextEntity; |
| private final ComponentLifecycle componentLifecycle; |
| private final URI exampleUri; |
| private final Set<AffectedComponentEntity> affectedComponents; |
| private final boolean replicateRequest; |
| private final NiFiUser nifiUser; |
| |
| public InitiateChangeParameterContextRequestWrapper(final ParameterContextEntity parameterContextEntity, final ComponentLifecycle componentLifecycle, |
| final URI exampleUri, final Set<AffectedComponentEntity> affectedComponents, final boolean replicateRequest, |
| final NiFiUser nifiUser) { |
| |
| this.parameterContextEntity = parameterContextEntity; |
| this.componentLifecycle = componentLifecycle; |
| this.exampleUri = exampleUri; |
| this.affectedComponents = affectedComponents; |
| this.replicateRequest = replicateRequest; |
| this.nifiUser = nifiUser; |
| } |
| |
| public ParameterContextEntity getParameterContextEntity() { |
| return parameterContextEntity; |
| } |
| |
| public ComponentLifecycle getComponentLifecycle() { |
| return componentLifecycle; |
| } |
| |
| public URI getExampleUri() { |
| return exampleUri; |
| } |
| |
| public Set<AffectedComponentEntity> getReferencingComponents() { |
| return affectedComponents; |
| } |
| |
| public boolean isReplicateRequest() { |
| return replicateRequest; |
| } |
| |
| public NiFiUser getUser() { |
| return nifiUser; |
| } |
| } |
| |
| |
| |
| public void setServiceFacade(NiFiServiceFacade serviceFacade) { |
| this.serviceFacade = serviceFacade; |
| } |
| |
| public void setAuthorizer(Authorizer authorizer) { |
| this.authorizer = authorizer; |
| } |
| |
| public void setClusterComponentLifecycle(ComponentLifecycle componentLifecycle) { |
| this.clusterComponentLifecycle = componentLifecycle; |
| } |
| |
| public void setLocalComponentLifecycle(ComponentLifecycle componentLifecycle) { |
| this.localComponentLifecycle = componentLifecycle; |
| } |
| |
| public void setDtoFactory(final DtoFactory dtoFactory) { |
| this.dtoFactory = dtoFactory; |
| } |
| |
| } |