blob: aee12455772706c92423d6cdf643a6c804a1bc40 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.web.api;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.servlet.http.HttpServletRequest;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
import org.apache.nifi.authorization.AuthorizeParameterReference;
import org.apache.nifi.authorization.ComponentAuthorizable;
import org.apache.nifi.authorization.ConnectionAuthorizable;
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.SnippetAuthorizable;
import org.apache.nifi.authorization.TemplateContentsAuthorizable;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserDetails;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessGroupReplaceRequestDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FunnelEntity;
import org.apache.nifi.web.api.entity.FunnelsEntity;
import org.apache.nifi.web.api.entity.InputPortsEntity;
import org.apache.nifi.web.api.entity.InstantiateTemplateRequestEntity;
import org.apache.nifi.web.api.entity.LabelEntity;
import org.apache.nifi.web.api.entity.LabelsEntity;
import org.apache.nifi.web.api.entity.OutputPortsEntity;
import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupUploadEntity;
import org.apache.nifi.web.api.entity.ProcessGroupsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.apache.nifi.web.api.entity.TemplateEntity;
import org.apache.nifi.web.api.entity.VariableRegistryEntity;
import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.apache.nifi.web.util.Pause;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* RESTful endpoint for managing a Group.
value = "/process-groups",
description = "Endpoint for managing a Process Group."
public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportEntity, ProcessGroupReplaceRequestEntity> {
private static final Logger logger = LoggerFactory.getLogger(ProcessGroupResource.class);
private ProcessorResource processorResource;
private InputPortResource inputPortResource;
private OutputPortResource outputPortResource;
private FunnelResource funnelResource;
private LabelResource labelResource;
private RemoteProcessGroupResource remoteProcessGroupResource;
private ConnectionResource connectionResource;
private TemplateResource templateResource;
private ControllerServiceResource controllerServiceResource;
private final ConcurrentMap<String, VariableRegistryUpdateRequest> varRegistryUpdateRequests = new ConcurrentHashMap<>();
private static final int MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS = 100;
private static final long VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION = TimeUnit.MINUTES.toMillis(1L);
private final ExecutorService variableRegistryThreadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS,
new ThreadFactory() {
public Thread newThread(final Runnable r) {
final Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("Variable Registry Update Thread");
return thread;
private static final ObjectMapper MAPPER = new ObjectMapper();
static {
MAPPER.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL));
MAPPER.setAnnotationIntrospector(new JaxbAnnotationIntrospector(MAPPER.getTypeFactory()));
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
* Populates the remaining fields in the specified process groups.
* @param processGroupEntities groups
* @return group dto
public Set<ProcessGroupEntity> populateRemainingProcessGroupEntitiesContent(Set<ProcessGroupEntity> processGroupEntities) {
for (ProcessGroupEntity processGroupEntity : processGroupEntities) {
return processGroupEntities;
* Populates the remaining fields in the specified process group.
* @param processGroupEntity group
* @return group dto
public ProcessGroupEntity populateRemainingProcessGroupEntityContent(ProcessGroupEntity processGroupEntity) {
processGroupEntity.setUri(generateResourceUri("process-groups", processGroupEntity.getId()));
return processGroupEntity;
* Populates the remaining content of the specified snippet.
private FlowDTO populateRemainingSnippetContent(FlowDTO flow) {
// go through each process group child and populate its uri
if (flow.getProcessGroups() != null) {
return flow;
* Retrieves the contents of the specified group.
* @param groupId The id of the process group.
* @return A processGroupEntity.
value = "Gets a process group",
response = ProcessGroupEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getProcessGroup(
value = "The process group id.",
required = false
@PathParam("id") final String groupId) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// get this process group contents
final ProcessGroupEntity entity = serviceFacade.getProcessGroup(groupId);
if (entity.getComponent() != null) {
return generateOkResponse(entity).build();
* Retrieves the specified group as a versioned flow snapshot for download.
* @param groupId The id of the process group
* @return A processGroupEntity.
value = "Gets a process group for download",
response = String.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response exportProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
// ensure access to process groups (nested), encapsulated controller services and referenced parameter contexts
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true,
false, true, false, true);
// get the versioned flow
final VersionedFlowSnapshot currentVersionedFlowSnapshot = serviceFacade.getCurrentFlowSnapshotByGroupId(groupId);
// determine the name of the attachment - possible issues with spaces in file names
final VersionedProcessGroup currentVersionedProcessGroup = currentVersionedFlowSnapshot.getFlowContents();
final String flowName = currentVersionedProcessGroup.getName();
final String filename = flowName.replaceAll("\\s", "_") + ".json";
return generateOkResponse(currentVersionedFlowSnapshot).header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", filename)).build();
* Retrieves a list of local modifications to the Process Group since it was last synchronized with the Flow Registry
* @param groupId The id of the process group.
* @return A processGroupEntity.
value = "Gets a list of local modifications to the Process Group since it was last synchronized with the Flow Registry",
response = FlowComparisonEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}"),
@Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getLocalModifications(
value = "The process group id.",
required = false
@PathParam("id") final String groupId) throws IOException, NiFiRegistryException {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, false, false, true, false, false);
final FlowComparisonEntity entity = serviceFacade.getLocalModifications(groupId);
return generateOkResponse(entity).build();
* Retrieves the Variable Registry for the group with the given ID
* @param groupId the ID of the Process Group
* @return the Variable Registry for the group
@ApiOperation(value = "Gets a process group's variable registry",
response = VariableRegistryEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getVariableRegistry(
@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
@ApiParam(value = "Whether or not to include ancestor groups", required = false) @QueryParam("includeAncestorGroups") @DefaultValue("true") final boolean includeAncestorGroups) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// get this process group's variable registry
final VariableRegistryEntity entity = serviceFacade.getVariableRegistry(groupId, includeAncestorGroups);
return generateOkResponse(entity).build();
* Updates the specified process group.
* @param httpServletRequest request
* @param id The id of the process group.
* @param requestProcessGroupEntity A processGroupEntity.
* @return A processGroupEntity.
value = "Updates a process group",
response = ProcessGroupEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response updateProcessGroup(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String id,
value = "The process group configuration details.",
required = true
) final ProcessGroupEntity requestProcessGroupEntity) {
if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("Process group details must be specified.");
if (requestProcessGroupEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
// ensure the same id is being used
final ProcessGroupDTO requestProcessGroupDTO = requestProcessGroupEntity.getComponent();
if (!id.equals(requestProcessGroupDTO.getId())) {
throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+ "not equal the process group id of the requested resource (%s).", requestProcessGroupDTO.getId(), id));
final PositionDTO proposedPosition = requestProcessGroupDTO.getPosition();
if (proposedPosition != null) {
if (proposedPosition.getX() == null || proposedPosition.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified.");
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestProcessGroupEntity);
} else if (isDisconnectedFromCluster()) {
// handle expects request (usually from the cluster manager)
final Revision requestRevision = getRevision(requestProcessGroupEntity, id);
return withWriteLock(
lookup -> {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
Authorizable authorizable = lookup.getProcessGroup(id).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, user);
// Ensure that user has READ permission on current Parameter Context (if any) because user is un-binding.
final ParameterContextReferenceEntity referencedParamContext = requestProcessGroupDTO.getParameterContext();
if (referencedParamContext != null) {
// Lookup the current Parameter Context and determine whether or not the Parameter Context is changing
final String groupId = requestProcessGroupDTO.getId();
final ProcessGroupEntity currentGroupEntity = serviceFacade.getProcessGroup(groupId);
final ProcessGroupDTO groupDto = currentGroupEntity.getComponent();
final ParameterContextReferenceEntity currentParamContext = groupDto.getParameterContext();
final String currentParamContextId = currentParamContext == null ? null : currentParamContext.getId();
final boolean parameterContextChanging = !Objects.equals(referencedParamContext.getId(), currentParamContextId);
// If Parameter Context is changing...
if (parameterContextChanging) {
// In order to bind to a Parameter Context, the user must have the READ policy to that Parameter Context.
if (referencedParamContext.getId() != null) {
lookup.getParameterContext(referencedParamContext.getId()).authorize(authorizer, RequestAction.READ, user);
// If currently referencing a Parameter Context, we must authorize that the user has READ permissions on the Parameter Context in order to un-bind to it.
if (currentParamContextId != null) {
lookup.getParameterContext(currentParamContextId).authorize(authorizer, RequestAction.READ, user);
// Because the user will be changing the behavior of any component in this group that is currently referencing any Parameter, we must ensure that the user has
// both READ and WRITE policies for each of those components.
for (final AffectedComponentEntity affectedComponentEntity : serviceFacade.getProcessorsReferencingParameter(groupId)) {
final Authorizable processorAuthorizable = lookup.getProcessor(affectedComponentEntity.getId()).getAuthorizable();
processorAuthorizable.authorize(authorizer, RequestAction.READ, user);
processorAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
for (final AffectedComponentEntity affectedComponentEntity : serviceFacade.getControllerServicesReferencingParameter(groupId)) {
final Authorizable serviceAuthorizable = lookup.getControllerService(affectedComponentEntity.getId()).getAuthorizable();
serviceAuthorizable.authorize(authorizer, RequestAction.READ, user);
serviceAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
() -> serviceFacade.verifyUpdateProcessGroup(requestProcessGroupDTO),
(revision, processGroupEntity) -> {
// update the process group
final ProcessGroupEntity entity = serviceFacade.updateProcessGroup(revision, processGroupEntity.getComponent());
// prune response as necessary
if (entity.getComponent() != null) {
return generateOkResponse(entity).build();
@ApiOperation(value = "Gets a process group's variable registry",
response = VariableRegistryUpdateRequestEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getVariableRegistryUpdateRequest(
@ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId,
@ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) {
if (groupId == null || updateId == null) {
throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, user);
final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId);
if (request == null) {
throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId);
if (!groupId.equals(request.getProcessGroupId())) {
throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
if (!user.equals(request.getUser())) {
throw new IllegalArgumentException("Only the user that submitted the update request can retrieve it.");
final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
return generateOkResponse(entity).build();
@ApiOperation(value = "Deletes an update request for a process group's variable registry. If the request is not yet complete, it will automatically be cancelled.",
response = VariableRegistryUpdateRequestEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response deleteVariableRegistryUpdateRequest(
value = "The process group id.",
required = true
@PathParam("groupId") final String groupId,
value = "The ID of the Variable Registry Update Request",
required = true)
@PathParam("updateId") final String updateId,
value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.",
required = false
@QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged) {
if (groupId == null || updateId == null) {
throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
if (isDisconnectedFromCluster()) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, user);
processGroup.authorize(authorizer, RequestAction.WRITE, user);
final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId);
if (request == null) {
throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId);
if (!groupId.equals(request.getProcessGroupId())) {
throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
if (!user.equals(request.getUser())) {
throw new IllegalArgumentException("Only the user that submitted the update request can remove it.");
final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
return generateOkResponse(entity).build();
@ApiOperation(value = "Updates the contents of a Process Group's variable Registry", response = VariableRegistryEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response updateVariableRegistry(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
@ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) {
throw new IllegalArgumentException("Variable Registry details must be specified.");
if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
throw new IllegalArgumentException("Process Group Revision must be specified.");
// ensure the same id is being used
final VariableRegistryDTO requestRegistryDto = requestVariableRegistryEntity.getVariableRegistry();
if (!groupId.equals(requestRegistryDto.getProcessGroupId())) {
throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+ "not equal the process group id of the requested resource (%s).", requestRegistryDto.getProcessGroupId(), groupId));
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestVariableRegistryEntity);
} else if (isDisconnectedFromCluster()) {
// handle expects request (usually from the cluster manager)
final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
lookup -> {
Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
(revision, variableRegistryEntity) -> {
final VariableRegistryDTO variableRegistry = variableRegistryEntity.getVariableRegistry();
// update the process group
final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, variableRegistry);
return generateOkResponse(entity).build();
* Updates the variable registry for the specified process group.
* @param httpServletRequest request
* @param groupId The id of the process group.
* @param requestVariableRegistryEntity the Variable Registry Entity
* @return A Variable Registry Entry.
@ApiOperation(value = "Submits a request to update a process group's variable registry",
response = VariableRegistryUpdateRequestEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response submitUpdateVariableRegistryRequest(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
@ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) {
throw new IllegalArgumentException("Variable Registry details must be specified.");
if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
throw new IllegalArgumentException("Process Group Revision must be specified.");
if (isDisconnectedFromCluster()) {
// In order to update variables in a variable registry, we have to perform the following steps:
// 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service).
// 1a. Determine ID's of components
// 1b. Determine Revision's of associated components
// 2. Stop All Active Affected Processors
// 3. Disable All Active Affected Controller Services
// 4. Update the Variables
// 5. Re-Enable all previously Active Affected Controller Services (services only, not dependent components)
// 6. Re-Enable all previously Active Processors that Depended on the Controller Services
// Determine the affected components (and their associated revisions)
final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestVariableRegistryEntity.getVariableRegistry());
final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry();
if (computedRegistryDto == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
final Map<String, List<AffectedComponentDTO>> activeAffectedComponentsByType =
.collect(Collectors.groupingBy(comp -> comp.getReferenceType()));
final List<AffectedComponentDTO> activeAffectedProcessors = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
final List<AffectedComponentDTO> activeAffectedServices = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// define access authorize for execution below
final AuthorizeAccess authorizeAccess = lookup -> {
final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable();
groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
// For every component that is affected, the user must have READ permissions and WRITE permissions
// (because this action requires stopping the component).
if (activeAffectedProcessors != null) {
for (final AffectedComponentDTO activeAffectedComponent : activeAffectedProcessors) {
final Authorizable authorizable = lookup.getProcessor(activeAffectedComponent.getId()).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.READ, user);
authorizable.authorize(authorizer, RequestAction.WRITE, user);
if (activeAffectedServices != null) {
for (final AffectedComponentDTO activeAffectedComponent : activeAffectedServices) {
final Authorizable authorizable = lookup.getControllerService(activeAffectedComponent.getId()).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.READ, user);
authorizable.authorize(authorizer, RequestAction.WRITE, user);
if (isReplicateRequest()) {
// authorize access
// update the variable registry
final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, allAffectedComponents, user);
final URI originalUri = getAbsolutePath();
// Submit the task to be run in the background
final Runnable taskWrapper = () -> {
try {
// set the user authentication token
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user));
updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestVariableRegistryEntity);
// ensure the request is marked complete
} catch (final Exception e) {
logger.error("Failed to update variable registry", e);
updateRequest.setFailureReason("An unexpected error has occurred: " + e);
} finally {
// clear the authentication token
final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
final URI location = URI.create(responseEntity.getRequest().getUri());
return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
final UpdateVariableRegistryRequestWrapper requestWrapper =
new UpdateVariableRegistryRequestWrapper(allAffectedComponents, activeAffectedProcessors, activeAffectedServices, requestVariableRegistryEntity);
final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
(revision, wrapper) ->
updateVariableRegistryLocal(groupId, wrapper.getAllAffectedComponents(), wrapper.getActiveAffectedProcessors(),
wrapper.getActiveAffectedServices(), user, revision, wrapper.getVariableRegistryEntity())
private Pause createPause(final VariableRegistryUpdateRequest updateRequest) {
return new Pause() {
public boolean pause() {
if (updateRequest.isComplete()) {
return false;
try {
} catch (final InterruptedException ie) {
return false;
return !updateRequest.isComplete();
private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors,
final Collection<AffectedComponentDTO> affectedServices, final VariableRegistryUpdateRequest updateRequest,
final VariableRegistryEntity requestEntity) throws InterruptedException, IOException {
final Pause pause = createPause(updateRequest);
// stop processors
if (affectedProcessors != null) {"In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Processors", groupId, affectedProcessors.size());
scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep());
} else {"In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId);
// disable controller services
if (affectedServices != null) {"In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Controller Services", groupId, affectedServices.size());
activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep());
} else {"In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId);
// apply updates"In order to update Variable Registry for Process Group with ID {}, replicating request to apply updates to variable registry", groupId);
applyVariableRegistryUpdate(groupId, originalUri, updateRequest, requestEntity);
// re-enable controller services
if (affectedServices != null) {"In order to update Variable Registry for Process Group with ID {}, replicating request to re-enable {} affected services", groupId, affectedServices.size());
activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep());
} else {"In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId);
// restart processors
if (affectedProcessors != null) {"In order to update Variable Registry for Process Group with ID {}, replicating request to restart {} affected processors", groupId, affectedProcessors.size());
scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep());
} else {"In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId);
* Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
* @param groupId the ID of the Process Group to poll
* @param processorIds the ID of all Processors whose state should be equal to the given desired state
* @param desiredState the desired state for all processors with the ID's given
* @param pause the Pause that can be used to wait between polling
* @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
private boolean waitForProcessorStatus(final URI originalUri, final String groupId, final Set<String> processorIds, final ScheduledState desiredState,
final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException {
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
final Map<String, String> headers = new HashMap<>();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
boolean continuePolling = true;
while (continuePolling) {
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class);
final Set<ProcessorEntity> processorEntities = processorsEntity.getProcessors();
if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) {
logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
return true;
// Not all of the processors are in the desired state. Pause for a bit and poll again.
continuePolling = pause.pause();
return false;
* Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
* @param groupId the ID of the Process Group to poll
* @param processorIds the ID of all Processors whose state should be equal to the given desired state
* @param desiredState the desired state for all processors with the ID's given
* @param pause the Pause that can be used to wait between polling
* @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
private boolean waitForLocalProcessor(final String groupId, final Set<String> processorIds, final ScheduledState desiredState,
final VariableRegistryUpdateRequest updateRequest, final Pause pause) {
boolean continuePolling = true;
while (continuePolling) {
final Set<ProcessorEntity> processorEntities = serviceFacade.getProcessors(groupId, true);
if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) {
logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
return true;
// Not all of the processors are in the desired state. Pause for a bit and poll again.
continuePolling = pause.pause();
return false;
private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final VariableRegistryUpdateRequest updateRequest,
final Set<String> processorIds, final ScheduledState desiredState) {
final String desiredStateName =;
// update the affected processors
.filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId()))
.forEach(entity -> {
final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId());
// only consider update this component if the user had permissions to it
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
final boolean allProcessorsMatch =
.filter(entity -> processorIds.contains(entity.getId()))
.allMatch(entity -> {
final ProcessorStatusDTO status = entity.getStatus();
final String runStatus = status.getAggregateSnapshot().getRunStatus();
final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
if (!stateMatches) {
return false;
return desiredState != ScheduledState.STOPPED || status.getAggregateSnapshot().getActiveThreadCount() == 0;
return allProcessorsMatch;
* Updates the affected controller services in the specified updateRequest with the serviceEntities.
* @param serviceEntities service entities
* @param updateRequest update request
private void updateAffectedControllerServices(final Set<ControllerServiceEntity> serviceEntities, final VariableRegistryUpdateRequest updateRequest) {
// update the affected components
.filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId()))
.forEach(entity -> {
final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId());
// only consider update this component if the user had permissions to it
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
* @param groupId the ID of the Process Group to poll
* @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state
* @param desiredState the desired state for all services with the ID's given
* @param pause the Pause that can be used to wait between polling
* @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
private boolean waitForControllerServiceStatus(final URI originalUri, final String groupId, final Set<String> serviceIds,
final ControllerServiceState desiredState, final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException {
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false&includeDescendantGroups=true", originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
final Map<String, String> headers = new HashMap<>();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
boolean continuePolling = true;
while (continuePolling) {
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class);
final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices();
// update the affected controller services
updateAffectedControllerServices(serviceEntities, updateRequest);
final String desiredStateName =;
final boolean allServicesMatch =
.map(entity -> entity.getComponent())
.filter(service -> serviceIds.contains(service.getId()))
.map(service -> service.getState())
.allMatch(state -> state.equals(desiredStateName));
if (allServicesMatch) {
logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState);
return true;
// Not all of the processors are in the desired state. Pause for a bit and poll again.
continuePolling = pause.pause();
return false;
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
* @param groupId the ID of the Process Group to poll
* @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state
* @param desiredState the desired state for all services with the ID's given
* @param pause the Pause that can be used to wait between polling
* @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
private boolean waitForLocalControllerServiceStatus(final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState,
final VariableRegistryUpdateRequest updateRequest, final Pause pause) {
boolean continuePolling = true;
while (continuePolling) {
final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true);
// update the affected controller services
updateAffectedControllerServices(serviceEntities, updateRequest);
final String desiredStateName =;
final boolean allServicesMatch =
.map(entity -> entity.getComponent())
.filter(service -> serviceIds.contains(service.getId()))
.map(service -> service.getState())
.allMatch(state -> desiredStateName.equals(state));
if (allServicesMatch) {
logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState);
return true;
// Not all of the processors are in the desired state. Pause for a bit and poll again.
continuePolling = pause.pause();
return false;
private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user) {
final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId, affectedComponents, user);
// before adding to the request map, purge any old requests. Must do this by creating a List of ID's
// and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException.
final Date oneMinuteAgo = new Date(System.currentTimeMillis() - VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION);
final List<String> completedRequestIds = varRegistryUpdateRequests.entrySet().stream()
.filter(entry -> entry.getValue().isComplete())
.filter(entry -> entry.getValue().getLastUpdated().before(oneMinuteAgo))
final int requestCount = varRegistryUpdateRequests.size();
throw new IllegalStateException("There are already " + requestCount + " update requests for variable registries. "
+ "Cannot issue any more requests until the older ones are deleted or expire");
this.varRegistryUpdateRequests.put(updateRequest.getRequestId(), updateRequest);
return updateRequest;
private Response updateVariableRegistryLocal(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final List<AffectedComponentDTO> affectedProcessors,
final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final Revision requestRevision, final VariableRegistryEntity requestEntity) {
final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() :
.map(component -> component.getId())
Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
final Set<String> affectedServiceIds = affectedServices == null ? Collections.emptySet() :
.map(component -> component.getId())
Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
// update the variable registry
final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, affectedComponents, user);
final Pause pause = createPause(updateRequest);
final Runnable updateTask = new Runnable() {
public void run() {
try {
// set the user authentication token
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user));
// Stop processors
performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStopProcessorsStep(), "Stopping Processors",
() -> stopProcessors(updateRequest, groupId, processorRevisionMap, pause));
// Update revision map because this will have modified the revisions of our components.
final Map<String, Revision> updatedProcessorRevisionMap = getRevisions(groupId, affectedProcessorIds);
// Disable controller services
performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getDisableServicesStep(), "Disabling Controller Services",
() -> disableControllerServices(updateRequest, groupId, serviceRevisionMap, pause));
// Update revision map because this will have modified the revisions of our components.
final Map<String, Revision> updatedServiceRevisionMap = getRevisions(groupId, affectedServiceIds);
// Apply the updates
performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to Variable Registry",
() -> {
final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(requestRevision, requestEntity.getVariableRegistry());
// Re-enable the controller services
performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller Services",
() -> enableControllerServices(updateRequest, groupId, updatedServiceRevisionMap, pause));
// Restart processors
performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors",
() -> startProcessors(updateRequest, groupId, updatedProcessorRevisionMap, pause));
// Set complete
updateRequest.setLastUpdated(new Date());
} catch (final Exception e) {
logger.error("Failed to update Variable Registry for Proces Group with ID " + groupId, e);
updateRequest.setFailureReason("An unexpected error has occurred: " + e);
} finally {
// clear the authentication token
// Submit the task to be run in the background
final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
final URI location = URI.create(responseEntity.getRequest().getUri());
return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
private Map<String, Revision> getRevisions(final String groupId, final Set<String> componentIds) {
final Set<Revision> processorRevisions = serviceFacade.getRevisionsFromGroup(groupId, group -> componentIds);
return -> revision.getComponentId(), Function.identity()));
private void performUpdateVariableRegistryStep(final String groupId, final VariableRegistryUpdateRequest request, final VariableRegistryUpdateStep step,
final String stepDescription, final Runnable action) {
if (request.isComplete()) {"In updating Variable Registry for Process Group with ID {}"
+ ", skipping the following step because the request has completed already: {}", groupId, stepDescription);
try {"In order to update Variable Registry for Process Group with ID {}, {}", groupId, stepDescription);;
} catch (final Exception e) {
logger.error("Failed to update variable registry for Process Group with ID {}", groupId, e);
request.setFailureReason("Failed to update Variable Registry because failed while performing step: " + stepDescription);
request.setLastUpdated(new Date());
private void stopProcessors(final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
final Map<String, Revision> processorRevisions, final Pause pause) {
if (processorRevisions.isEmpty()) {
serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions.keySet());
serviceFacade.scheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions);
waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, updateRequest, pause);
private void startProcessors(final VariableRegistryUpdateRequest request, final String processGroupId, final Map<String, Revision> processorRevisions, final Pause pause) {
if (processorRevisions.isEmpty()) {
serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions.keySet());
serviceFacade.scheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions);
waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, request, pause);
private void disableControllerServices(final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
final Map<String, Revision> serviceRevisions, final Pause pause) {
if (serviceRevisions.isEmpty()) {
serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions.keySet());
serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions);
waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, updateRequest, pause);
private void enableControllerServices(final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
final Map<String, Revision> serviceRevisions, final Pause pause) {
if (serviceRevisions.isEmpty()) {
serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions.keySet());
serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions);
waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, updateRequest, pause);
private void scheduleProcessors(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
final Pause pause, final Collection<AffectedComponentDTO> affectedProcessors, final ScheduledState desiredState,
final VariableRegistryUpdateStep updateStep) throws InterruptedException {
final Set<String> affectedProcessorIds =
.map(component -> component.getId())
final Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
final Map<String, RevisionDTO> processorRevisionDtoMap = processorRevisionMap.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
final ScheduleComponentsEntity scheduleProcessorsEntity = new ScheduleComponentsEntity();
URI scheduleGroupUri;
try {
scheduleGroupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse();
final int stopProcessorStatus = clusterResponse.getStatus();
if (stopProcessorStatus != Status.OK.getStatusCode()) {
updateRequest.getStopProcessorsStep().setFailureReason("Failed while " + updateStep.getDescription());
updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
updateRequest.setLastUpdated(new Date());
final boolean processorsTransitioned = waitForProcessorStatus(originalUri, groupId, affectedProcessorIds, desiredState, updateRequest, pause);
if (!processorsTransitioned) {
updateStep.setFailureReason("Failed while " + updateStep.getDescription());
updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
private void activateControllerServices(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep)
throws InterruptedException {
final Set<String> affectedServiceIds =
.map(component -> component.getId())
final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
final ActivateControllerServicesEntity activateServicesEntity = new ActivateControllerServicesEntity();
URI controllerServicesUri;
try {
controllerServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
final int disableServicesStatus = clusterResponse.getStatus();
if (disableServicesStatus != Status.OK.getStatusCode()) {
updateStep.setFailureReason("Failed while " + updateStep.getDescription());
updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
updateRequest.setLastUpdated(new Date());
final boolean serviceTransitioned = waitForControllerServiceStatus(originalUri, groupId, affectedServiceIds, desiredState, updateRequest, pause);
if (!serviceTransitioned) {
updateStep.setFailureReason("Failed while " + updateStep.getDescription());
updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
final VariableRegistryEntity updateEntity) throws InterruptedException, IOException {
// convert request accordingly
URI applyUpdatesUri;
try {
applyUpdatesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/variable-registry", null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse();
final int applyUpdatesStatus = clusterResponse.getStatus();
updateRequest.setLastUpdated(new Date());
if (applyUpdatesStatus == Status.OK.getStatusCode()) {
// grab the current process group revision
final VariableRegistryEntity entity = getResponseEntity(clusterResponse, VariableRegistryEntity.class);
} else {
final String message = getResponseEntity(clusterResponse, String.class);
// update the request progress
updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry: " + message);
updateRequest.setFailureReason("Failed to apply updates to the Variable Registry: " + message);
* Extracts the response entity from the specified node response.
* @param nodeResponse node response
* @param clazz class
* @param <T> type of class
* @return the response entity
protected <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
T entity = (T) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(clazz);
return entity;
* Creates a request to drop the flowfiles from all connection queues within a process group (recursively).
* @param httpServletRequest request
* @param processGroupId The id of the process group to be removed.
* @return A dropRequestEntity.
value = "Creates a request to drop all flowfiles of all connection queues in this process group.",
response = ProcessGroupEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"),
@Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections")
value = {
@ApiResponse(code = 202, message = "The request has been accepted. An HTTP response header will contain the URI where the status can be polled."),
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createEmptyAllConnectionsRequest(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String processGroupId
) {
if (isReplicateRequest()) {
return replicate(HttpMethod.POST);
final ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity();
return withWriteLock(
lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup),
(processGroupEntity) -> {
// ensure the id is the same across the cluster
final String dropRequestId = generateUuid();
// submit the drop request
final DropRequestDTO dropRequest = serviceFacade.createDropAllFlowFilesInProcessGroup(processGroupEntity.getId(), dropRequestId);
dropRequest.setUri(generateResourceUri("process-groups", processGroupEntity.getId(), "empty-all-connections-requests", dropRequest.getId()));
// create the response entity
final DropRequestEntity entity = new DropRequestEntity();
// generate the URI where the response will be
final URI location = URI.create(dropRequest.getUri());
return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
* Checks the status of an outstanding request for dropping all flowfiles within a process group.
* @param processGroupId The id of the process group
* @param dropRequestId The id of the drop request
* @return A dropRequestEntity
value = "Gets the current status of a drop all flowfiles request.",
response = DropRequestEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"),
@Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getDropAllFlowfilesRequest(
value = "The process group id.",
required = true
@PathParam("id") final String processGroupId,
value = "The drop request id.",
required = true
@PathParam("drop-request-id") final String dropRequestId
) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup));
// get the drop request
final DropRequestDTO dropRequest = serviceFacade.getDropAllFlowFilesRequest(processGroupId, dropRequestId);
dropRequest.setUri(generateResourceUri("process-groups", processGroupId, "empty-all-connections-requests", dropRequest.getId()));
// create the response entity
final DropRequestEntity entity = new DropRequestEntity();
return generateOkResponse(entity).build();
* Cancels the specified request for dropping all flowfiles within a process group.
* @param httpServletRequest request
* @param processGroupId The process group id
* @param dropRequestId The drop request id
* @return A dropRequestEntity
value = "Cancels and/or removes a request to drop all flowfiles.",
response = DropRequestEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups"),
@Authorization(value = "Write Source Data - /data/{component-type}/{uuid} - For all encapsulated connections")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response removeDropRequest(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String processGroupId,
value = "The drop request id.",
required = true
@PathParam("drop-request-id") final String dropRequestId
) {
if (isReplicateRequest()) {
return replicate(HttpMethod.DELETE);
return withWriteLock(
new DropEntity(processGroupId, dropRequestId),
lookup -> authorizeHandleDropAllFlowFilesRequest(processGroupId, lookup),
(dropEntity) -> {
// delete the drop request
final DropRequestDTO dropRequest = serviceFacade.deleteDropAllFlowFilesRequest(dropEntity.getEntityId(), dropEntity.getDropRequestId());
dropRequest.setUri(generateResourceUri("process-groups", dropEntity.getEntityId(), "empty-all-connections-requests", dropRequest.getId()));
// create the response entity
final DropRequestEntity entity = new DropRequestEntity();
return generateOkResponse(entity).build();
private void authorizeHandleDropAllFlowFilesRequest(String processGroupId, AuthorizableLookup lookup) {
final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId);
authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false);
.forEach(encapsulatedProcessGroup -> authorizeProcessGroup(encapsulatedProcessGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false));
.forEach(connectionSourceData -> connectionSourceData.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()));
* Removes the specified process group reference.
* @param httpServletRequest request
* @param version The revision is used to verify the client is working with the latest version of the flow.
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the process group to be removed.
* @return A processGroupEntity.
value = "Deletes a process group",
response = ProcessGroupEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}"),
@Authorization(value = "Write - Parent Process Group - /process-groups/{uuid}"),
@Authorization(value = "Read - any referenced Controller Services by any encapsulated components - /controller-services/{uuid}"),
@Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response removeProcessGroup(
@Context final HttpServletRequest httpServletRequest,
value = "The revision is used to verify the client is working with the latest version of the flow.",
required = false
@QueryParam(VERSION) final LongParameter version,
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId,
value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.",
required = false
@QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
value = "The process group id.",
required = true
@PathParam("id") final String id) {
// replicate if cluster manager
if (isReplicateRequest()) {
return replicate(HttpMethod.DELETE);
} else if (isDisconnectedFromCluster()) {
final ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity();
// handle expects request (usually from the cluster manager)
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
lookup -> {
final ProcessGroupAuthorizable processGroupAuthorizable = lookup.getProcessGroup(id);
// ensure write to this process group and all encapsulated components including templates and controller services. additionally, ensure
// read to any referenced services by encapsulated components
authorizeProcessGroup(processGroupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, true, true, false, false);
// ensure write permission to the parent process group, if applicable... if this is the root group the
// request will fail later but still need to handle authorization here
final Authorizable parentAuthorizable = processGroupAuthorizable.getAuthorizable().getParentAuthorizable();
if (parentAuthorizable != null) {
parentAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
() -> serviceFacade.verifyDeleteProcessGroup(id),
(revision, processGroupEntity) -> {
// delete the process group
final ProcessGroupEntity entity = serviceFacade.deleteProcessGroup(revision, processGroupEntity.getId());
// prune response as necessary
if (entity.getComponent() != null) {
// create the response
return generateOkResponse(entity).build();
* Adds the specified process group.
* @param httpServletRequest request
* @param groupId The group id
* @param requestProcessGroupEntity A processGroupEntity
* @return A processGroupEntity
* @throws IOException if the request indicates that the Process Group should be imported from a Flow Registry and NiFi is unable to communicate with the Flow Registry
value = "Creates a process group",
response = ProcessGroupEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createProcessGroup(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The process group configuration details.",
required = true
) final ProcessGroupEntity requestProcessGroupEntity) {
if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("Process group details must be specified.");
if (requestProcessGroupEntity.getRevision() == null || (requestProcessGroupEntity.getRevision().getVersion() == null || requestProcessGroupEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Process group.");
if (requestProcessGroupEntity.getComponent().getId() != null) {
throw new IllegalArgumentException("Process group ID cannot be specified.");
final PositionDTO proposedPosition = requestProcessGroupEntity.getComponent().getPosition();
if (proposedPosition != null) {
if (proposedPosition.getX() == null || proposedPosition.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified.");
// if the group name isn't specified, ensure the group is being imported from version control
if (StringUtils.isBlank(requestProcessGroupEntity.getComponent().getName()) && requestProcessGroupEntity.getComponent().getVersionControlInformation() == null) {
throw new IllegalArgumentException("The group name is required when the group is not imported from version control.");
if (requestProcessGroupEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestProcessGroupEntity.getComponent().getParentGroupId())) {
throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s",
requestProcessGroupEntity.getComponent().getParentGroupId(), groupId));
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
// Step 3: Resolve Bundle info
// Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added.
// Step 5: If any of the components is a Restricted Component, then we must authorize the user
// for write access to the RestrictedComponents resource
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
final VersionedFlowSnapshot flowSnapshot = getFlowFromRegistry(versionControlInfo);
// Step 3: Resolve Bundle info
// If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(flowSnapshot, groupId, NiFiUserUtils.getNiFiUser());
// Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added.
if (versionControlInfo != null) {
final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
serviceFacade.verifyImportProcessGroup(versionControlInfo, flowSnapshot.getFlowContents(), groupId);
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestProcessGroupEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> authorizeAccess(groupId, requestProcessGroupEntity, lookup),
() -> {
final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
if (versionedFlowSnapshot != null) {
processGroupEntity -> {
final ProcessGroupDTO processGroup = processGroupEntity.getComponent();
// set the processor id as appropriate
// ensure the group name comes from the versioned flow
final VersionedFlowSnapshot flowSnapshot = processGroupEntity.getVersionedFlowSnapshot();
if (flowSnapshot != null && StringUtils.isNotBlank(flowSnapshot.getFlowContents().getName()) && StringUtils.isBlank(processGroup.getName())) {
// create the process group contents
final Revision revision = getRevision(processGroupEntity, processGroup.getId());
ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroup);
if (flowSnapshot != null) {
final RevisionDTO revisionDto = entity.getRevision();
final String newGroupId = entity.getComponent().getId();
final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId);
// We don't want the Process Group's position to be updated because we want to keep the position where the user
// placed the Process Group. However, we do want to use the name of the Process Group that is in the Flow Contents.
// To accomplish this, we call updateProcessGroupContents() passing 'true' for the updateSettings flag but null out the position.
entity = serviceFacade.updateProcessGroupContents(newGroupRevision, newGroupId, versionControlInfo, flowSnapshot,
getIdGenerationSeed().orElse(null), false, true, true);
// generate a 201 created response
String uri = entity.getUri();
return generateCreatedResponse(URI.create(uri), entity).build();
private VersionedFlowSnapshot getFlowFromRegistry(final VersionControlInformationDTO versionControlInfo) {
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);
final Bucket bucket = flowSnapshot.getBucket();
final VersionedFlow flow = flowSnapshot.getFlow();
final VersionedFlowState flowState = flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
return flowSnapshot;
* Retrieves all the child process groups of the process group with the given id.
* @param groupId the parent process group id
* @return An entity containing all the child process group entities.
value = "Gets all process groups",
response = ProcessGroupsEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getProcessGroups(
value = "The process group id.",
required = true
@PathParam("id") final String groupId) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// get the process groups
final Set<ProcessGroupEntity> entities = serviceFacade.getProcessGroups(groupId);
// always prune the contents
for (final ProcessGroupEntity entity : entities) {
if (entity.getComponent() != null) {
// create the response entity
final ProcessGroupsEntity entity = new ProcessGroupsEntity();
// generate the response
return generateOkResponse(entity).build();
// ----------
// processors
// ----------
* Creates a new processor.
* @param httpServletRequest request
* @param groupId The group id
* @param requestProcessorEntity A processorEntity.
* @return A processorEntity.
value = "Creates a new processor",
response = ProcessorEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}"),
@Authorization(value = "Read - any referenced Controller Services - /controller-services/{uuid}"),
@Authorization(value = "Write - if the Processor is restricted - /restricted-components")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createProcessor(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The processor configuration details.",
required = true
) final ProcessorEntity requestProcessorEntity) {
if (requestProcessorEntity == null || requestProcessorEntity.getComponent() == null) {
throw new IllegalArgumentException("Processor details must be specified.");
if (requestProcessorEntity.getRevision() == null || (requestProcessorEntity.getRevision().getVersion() == null || requestProcessorEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Processor.");
final ProcessorDTO requestProcessor = requestProcessorEntity.getComponent();
if (requestProcessor.getId() != null) {
throw new IllegalArgumentException("Processor ID cannot be specified.");
if (StringUtils.isBlank(requestProcessor.getType())) {
throw new IllegalArgumentException("The type of processor to create must be specified.");
final PositionDTO proposedPosition = requestProcessor.getPosition();
if (proposedPosition != null) {
if (proposedPosition.getX() == null || proposedPosition.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified.");
if (requestProcessor.getParentGroupId() != null && !groupId.equals(requestProcessor.getParentGroupId())) {
throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s",
requestProcessor.getParentGroupId(), groupId));
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestProcessorEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
final Authorizable processGroup = groupAuthorizable.getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, user);
final Authorizable parameterContext = groupAuthorizable.getProcessGroup().getParameterContext();
final ProcessorConfigDTO configDto = requestProcessor.getConfig();
if (parameterContext != null && configDto != null) {
AuthorizeParameterReference.authorizeParameterReferences(configDto.getProperties(), authorizer, parameterContext, user);
ComponentAuthorizable authorizable = null;
try {
authorizable = lookup.getConfigurableComponent(requestProcessor.getType(), requestProcessor.getBundle());
if (authorizable.isRestricted()) {
authorizeRestrictions(authorizer, authorizable);
final ProcessorConfigDTO config = requestProcessor.getConfig();
if (config != null && config.getProperties() != null) {
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(config.getProperties(), authorizable, authorizer, lookup);
} finally {
if (authorizable != null) {
() -> serviceFacade.verifyCreateProcessor(requestProcessor),
processorEntity -> {
final ProcessorDTO processor = processorEntity.getComponent();
// set the processor id as appropriate
// create the new processor
final Revision revision = getRevision(processorEntity, processor.getId());
final ProcessorEntity entity = serviceFacade.createProcessor(revision, groupId, processor);
// generate a 201 created response
String uri = entity.getUri();
return generateCreatedResponse(URI.create(uri), entity).build();
* Retrieves all the processors in this NiFi.
* @param groupId group id
* @return A processorsEntity.
value = "Gets all processors",
response = ProcessorsEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getProcessors(
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
@ApiParam("Whether or not to include processors from descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// get the processors
final Set<ProcessorEntity> processors = serviceFacade.getProcessors(groupId, includeDescendantGroups);
// create the response entity
final ProcessorsEntity entity = new ProcessorsEntity();
// generate the response
return generateOkResponse(entity).build();
// -----------
// input ports
// -----------
* Creates a new input port.
* @param httpServletRequest request
* @param groupId The group id
* @param requestPortEntity A inputPortEntity.
* @return A inputPortEntity.
value = "Creates an input port",
response = PortEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createInputPort(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The input port configuration details.",
required = true
) final PortEntity requestPortEntity) {
if (requestPortEntity == null || requestPortEntity.getComponent() == null) {
throw new IllegalArgumentException("Port details must be specified.");
if (requestPortEntity.getRevision() == null || (requestPortEntity.getRevision().getVersion() == null || requestPortEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Input port.");
if (requestPortEntity.getComponent().getId() != null) {
throw new IllegalArgumentException("Input port ID cannot be specified.");
final PositionDTO proposedPosition = requestPortEntity.getComponent().getPosition();
if (proposedPosition != null) {
if (proposedPosition.getX() == null || proposedPosition.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified.");
if (requestPortEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestPortEntity.getComponent().getParentGroupId())) {
throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s",
requestPortEntity.getComponent().getParentGroupId(), groupId));
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestPortEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
portEntity -> {
// set the processor id as appropriate
// create the input port and generate the json
final Revision revision = getRevision(portEntity, portEntity.getComponent().getId());
final PortEntity entity = serviceFacade.createInputPort(revision, groupId, portEntity.getComponent());
// build the response
return generateCreatedResponse(URI.create(entity.getUri()), entity).build();
* Retrieves all the of input ports in this NiFi.
* @return A inputPortsEntity.
value = "Gets all input ports",
response = InputPortsEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getInputPorts(
value = "The process group id.",
required = true
@PathParam("id") final String groupId) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// get all the input ports
final Set<PortEntity> inputPorts = serviceFacade.getInputPorts(groupId);
final InputPortsEntity entity = new InputPortsEntity();
// generate the response
return generateOkResponse(entity).build();
// ------------
// output ports
// ------------
* Creates a new output port.
* @param httpServletRequest request
* @param groupId The group id
* @param requestPortEntity A outputPortEntity.
* @return A outputPortEntity.
value = "Creates an output port",
response = PortEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createOutputPort(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The output port configuration.",
required = true
) final PortEntity requestPortEntity) {
if (requestPortEntity == null || requestPortEntity.getComponent() == null) {
throw new IllegalArgumentException("Port details must be specified.");
if (requestPortEntity.getRevision() == null || (requestPortEntity.getRevision().getVersion() == null || requestPortEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Output port.");
if (requestPortEntity.getComponent().getId() != null) {
throw new IllegalArgumentException("Output port ID cannot be specified.");
final PositionDTO proposedPosition = requestPortEntity.getComponent().getPosition();
if (proposedPosition != null) {
if (proposedPosition.getX() == null || proposedPosition.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified.");
if (requestPortEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestPortEntity.getComponent().getParentGroupId())) {
throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s",
requestPortEntity.getComponent().getParentGroupId(), groupId));
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestPortEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
portEntity -> {
// set the processor id as appropriate
// create the output port and generate the json
final Revision revision = getRevision(portEntity, portEntity.getComponent().getId());
final PortEntity entity = serviceFacade.createOutputPort(revision, groupId, portEntity.getComponent());
// build the response
return generateCreatedResponse(URI.create(entity.getUri()), entity).build();
* Retrieves all the of output ports in this NiFi.
* @return A outputPortsEntity.
value = "Gets all output ports",
response = OutputPortsEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getOutputPorts(
value = "The process group id.",
required = true
@PathParam("id") final String groupId) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// get all the output ports
final Set<PortEntity> outputPorts = serviceFacade.getOutputPorts(groupId);
// create the response entity
final OutputPortsEntity entity = new OutputPortsEntity();
// generate the response
return generateOkResponse(entity).build();
// -------
// funnels
// -------
* Creates a new Funnel.
* @param httpServletRequest request
* @param groupId The group id
* @param requestFunnelEntity A funnelEntity.
* @return A funnelEntity.
value = "Creates a funnel",
response = FunnelEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createFunnel(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The funnel configuration details.",
required = true
) final FunnelEntity requestFunnelEntity) {
if (requestFunnelEntity == null || requestFunnelEntity.getComponent() == null) {
throw new IllegalArgumentException("Funnel details must be specified.");
if (requestFunnelEntity.getRevision() == null || (requestFunnelEntity.getRevision().getVersion() == null || requestFunnelEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Funnel.");
if (requestFunnelEntity.getComponent().getId() != null) {
throw new IllegalArgumentException("Funnel ID cannot be specified.");
final PositionDTO proposedPosition = requestFunnelEntity.getComponent().getPosition();
if (proposedPosition != null) {
if (proposedPosition.getX() == null || proposedPosition.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified.");
if (requestFunnelEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestFunnelEntity.getComponent().getParentGroupId())) {
throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s",
requestFunnelEntity.getComponent().getParentGroupId(), groupId));
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestFunnelEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
funnelEntity -> {
// set the processor id as appropriate
// create the funnel and generate the json
final Revision revision = getRevision(funnelEntity, funnelEntity.getComponent().getId());
final FunnelEntity entity = serviceFacade.createFunnel(revision, groupId, funnelEntity.getComponent());
// build the response
return generateCreatedResponse(URI.create(entity.getUri()), entity).build();
* Retrieves all the of funnels in this NiFi.
* @return A funnelsEntity.
value = "Gets all funnels",
response = FunnelsEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getFunnels(
value = "The process group id.",
required = true
@PathParam("id") final String groupId) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// get all the funnels
final Set<FunnelEntity> funnels = serviceFacade.getFunnels(groupId);
// create the response entity
final FunnelsEntity entity = new FunnelsEntity();
// generate the response
return generateOkResponse(entity).build();
// ------
// labels
// ------
* Creates a new Label.
* @param httpServletRequest request
* @param groupId The group id
* @param requestLabelEntity A labelEntity.
* @return A labelEntity.
value = "Creates a label",
response = LabelEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createLabel(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The label configuration details.",
required = true
) final LabelEntity requestLabelEntity) {
if (requestLabelEntity == null || requestLabelEntity.getComponent() == null) {
throw new IllegalArgumentException("Label details must be specified.");
if (requestLabelEntity.getRevision() == null || (requestLabelEntity.getRevision().getVersion() == null || requestLabelEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Label.");
if (requestLabelEntity.getComponent().getId() != null) {
throw new IllegalArgumentException("Label ID cannot be specified.");
final PositionDTO proposedPosition = requestLabelEntity.getComponent().getPosition();
if (proposedPosition != null) {
if (proposedPosition.getX() == null || proposedPosition.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified.");
if (requestLabelEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestLabelEntity.getComponent().getParentGroupId())) {
throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s",
requestLabelEntity.getComponent().getParentGroupId(), groupId));
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestLabelEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
labelEntity -> {
// set the processor id as appropriate
// create the label and generate the json
final Revision revision = getRevision(labelEntity, labelEntity.getComponent().getId());
final LabelEntity entity = serviceFacade.createLabel(revision, groupId, labelEntity.getComponent());
// build the response
return generateCreatedResponse(URI.create(entity.getUri()), entity).build();
* Retrieves all the of labels in this NiFi.
* @return A labelsEntity.
value = "Gets all labels",
response = LabelsEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getLabels(
value = "The process group id.",
required = true
@PathParam("id") final String groupId) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// get all the labels
final Set<LabelEntity> labels = serviceFacade.getLabels(groupId);
// create the response entity
final LabelsEntity entity = new LabelsEntity();
// generate the response
return generateOkResponse(entity).build();
// ---------------------
// remote process groups
// ---------------------
* Creates a new remote process group.
* @param httpServletRequest request
* @param groupId The group id
* @param requestRemoteProcessGroupEntity A remoteProcessGroupEntity.
* @return A remoteProcessGroupEntity.
value = "Creates a new process group",
response = RemoteProcessGroupEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createRemoteProcessGroup(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The remote process group configuration details.",
required = true
) final RemoteProcessGroupEntity requestRemoteProcessGroupEntity) {
if (requestRemoteProcessGroupEntity == null || requestRemoteProcessGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("Remote process group details must be specified.");
if (requestRemoteProcessGroupEntity.getRevision() == null
|| (requestRemoteProcessGroupEntity.getRevision().getVersion() == null || requestRemoteProcessGroupEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Remote process group.");
final RemoteProcessGroupDTO requestRemoteProcessGroupDTO = requestRemoteProcessGroupEntity.getComponent();
if (requestRemoteProcessGroupDTO.getId() != null) {
throw new IllegalArgumentException("Remote process group ID cannot be specified.");
if (requestRemoteProcessGroupDTO.getTargetUri() == null) {
throw new IllegalArgumentException("The URI of the process group must be specified.");
final PositionDTO proposedPosition = requestRemoteProcessGroupDTO.getPosition();
if (proposedPosition != null) {
if (proposedPosition.getX() == null || proposedPosition.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified.");
if (requestRemoteProcessGroupDTO.getParentGroupId() != null && !groupId.equals(requestRemoteProcessGroupDTO.getParentGroupId())) {
throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s",
requestRemoteProcessGroupDTO.getParentGroupId(), groupId));
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestRemoteProcessGroupEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
remoteProcessGroupEntity -> {
final RemoteProcessGroupDTO remoteProcessGroupDTO = remoteProcessGroupEntity.getComponent();
// set the processor id as appropriate
// parse the uri to check if the uri is valid
final String targetUris = remoteProcessGroupDTO.getTargetUris();
// since the uri is valid, use it
// create the remote process group
final Revision revision = getRevision(remoteProcessGroupEntity, remoteProcessGroupDTO.getId());
final RemoteProcessGroupEntity entity = serviceFacade.createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO);
return generateCreatedResponse(URI.create(entity.getUri()), entity).build();
* Retrieves all the of remote process groups in this NiFi.
* @return A remoteProcessGroupEntity.
value = "Gets all remote process groups",
response = RemoteProcessGroupsEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getRemoteProcessGroups(
value = "The process group id.",
required = true
@PathParam("id") final String groupId) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// get all the remote process groups
final Set<RemoteProcessGroupEntity> remoteProcessGroups = serviceFacade.getRemoteProcessGroups(groupId);
// prune response as necessary
for (RemoteProcessGroupEntity remoteProcessGroupEntity : remoteProcessGroups) {
if (remoteProcessGroupEntity.getComponent() != null) {
// create the response entity
final RemoteProcessGroupsEntity entity = new RemoteProcessGroupsEntity();
// generate the response
return generateOkResponse(entity).build();
// -----------
// connections
// -----------
* Creates a new connection.
* @param httpServletRequest request
* @param groupId The group id
* @param requestConnectionEntity A connectionEntity.
* @return A connectionEntity.
value = "Creates a connection",
response = ConnectionEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}"),
@Authorization(value = "Write Source - /{component-type}/{uuid}"),
@Authorization(value = "Write Destination - /{component-type}/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createConnection(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The connection configuration details.",
required = true
) final ConnectionEntity requestConnectionEntity) {
if (requestConnectionEntity == null || requestConnectionEntity.getComponent() == null) {
throw new IllegalArgumentException("Connection details must be specified.");
if (requestConnectionEntity.getRevision() == null || (requestConnectionEntity.getRevision().getVersion() == null || requestConnectionEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Connection.");
if (requestConnectionEntity.getComponent().getId() != null) {
throw new IllegalArgumentException("Connection ID cannot be specified.");
final List<PositionDTO> proposedBends = requestConnectionEntity.getComponent().getBends();
if (proposedBends != null) {
for (final PositionDTO proposedBend : proposedBends) {
if (proposedBend.getX() == null || proposedBend.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the each bend must be specified.");
if (requestConnectionEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestConnectionEntity.getComponent().getParentGroupId())) {
throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s",
requestConnectionEntity.getComponent().getParentGroupId(), groupId));
// get the connection
final ConnectionDTO requestConnection = requestConnectionEntity.getComponent();
if (requestConnection.getSource() == null || requestConnection.getSource().getId() == null) {
throw new IllegalArgumentException("The source of the connection must be specified.");
if (requestConnection.getSource().getType() == null) {
throw new IllegalArgumentException("The type of the source of the connection must be specified.");
final ConnectableType sourceConnectableType;
try {
sourceConnectableType = ConnectableType.valueOf(requestConnection.getSource().getType());
} catch (final IllegalArgumentException e) {
throw new IllegalArgumentException(String.format("Unrecognized source type %s. Expected values are [%s]",
requestConnection.getSource().getType(), StringUtils.join(ConnectableType.values(), ", ")));
if (requestConnection.getDestination() == null || requestConnection.getDestination().getId() == null) {
throw new IllegalArgumentException("The destination of the connection must be specified.");
if (requestConnection.getDestination().getType() == null) {
throw new IllegalArgumentException("The type of the destination of the connection must be specified.");
final ConnectableType destinationConnectableType;
try {
destinationConnectableType = ConnectableType.valueOf(requestConnection.getDestination().getType());
} catch (final IllegalArgumentException e) {
throw new IllegalArgumentException(String.format("Unrecognized destination type %s. Expected values are [%s]",
requestConnection.getDestination().getType(), StringUtils.join(ConnectableType.values(), ", ")));
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestConnectionEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
// ensure write access to the group
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
// explicitly handle RPGs differently as the connectable id can be ambiguous if self referencing
final Authorizable source;
if (ConnectableType.REMOTE_OUTPUT_PORT.equals(sourceConnectableType)) {
source = lookup.getRemoteProcessGroup(requestConnection.getSource().getGroupId());
} else {
source = lookup.getLocalConnectable(requestConnection.getSource().getId());
// ensure write access to the source
if (source == null) {
throw new ResourceNotFoundException("Cannot find source component with ID [" + requestConnection.getSource().getId() + "]");
source.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
// explicitly handle RPGs differently as the connectable id can be ambiguous if self referencing
final Authorizable destination;
if (ConnectableType.REMOTE_INPUT_PORT.equals(destinationConnectableType)) {
destination = lookup.getRemoteProcessGroup(requestConnection.getDestination().getGroupId());
} else {
destination = lookup.getLocalConnectable(requestConnection.getDestination().getId());
// ensure write access to the destination
if (destination == null) {
throw new ResourceNotFoundException("Cannot find destination component with ID [" + requestConnection.getDestination().getId() + "]");
destination.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
() -> serviceFacade.verifyCreateConnection(groupId, requestConnection),
connectionEntity -> {
final ConnectionDTO connection = connectionEntity.getComponent();
// set the connection id as appropriate
// create the new relationship target
final Revision revision = getRevision(connectionEntity, connection.getId());
final ConnectionEntity entity = serviceFacade.createConnection(revision, groupId, connection);
// extract the href and build the response
String uri = entity.getUri();
return generateCreatedResponse(URI.create(uri), entity).build();
* Gets all the connections.
* @return A connectionsEntity.
value = "Gets all connections",
response = ConnectionsEntity.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getConnections(
value = "The process group id.",
required = true
@PathParam("id") String groupId) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
// all of the relationships for the specified source processor
Set<ConnectionEntity> connections = serviceFacade.getConnections(groupId);
// create the client response entity
ConnectionsEntity entity = new ConnectionsEntity();
// generate the response
return generateOkResponse(entity).build();
// ----------------
// snippet instance
// ----------------
* Copies the specified snippet within this ProcessGroup. The snippet instance that is instantiated cannot be referenced at a later time, therefore there is no
* corresponding URI. Instead the request URI is returned.
* <p>
* Alternatively, we could have performed a PUT request. However, PUT requests are supposed to be idempotent and this endpoint is certainly not.
* @param httpServletRequest request
* @param groupId The group id
* @param requestCopySnippetEntity The copy snippet request
* @return A flowSnippetEntity.
value = "Copies a snippet and discards it.",
response = FlowEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}"),
@Authorization(value = "Read - /{component-type}/{uuid} - For each component in the snippet and their descendant components"),
@Authorization(value = "Write - if the snippet contains any restricted Processors - /restricted-components")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response copySnippet(
@Context HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") String groupId,
value = "The copy snippet request.",
required = true
) CopySnippetRequestEntity requestCopySnippetEntity) {
// ensure the position has been specified
if (requestCopySnippetEntity == null || requestCopySnippetEntity.getOriginX() == null || requestCopySnippetEntity.getOriginY() == null) {
throw new IllegalArgumentException("The origin position (x, y) must be specified");
if (requestCopySnippetEntity.getSnippetId() == null) {
throw new IllegalArgumentException("The snippet id must be specified.");
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestCopySnippetEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final SnippetAuthorizable snippet = authorizeSnippetUsage(lookup, groupId, requestCopySnippetEntity.getSnippetId(), false, true);
final Consumer<ComponentAuthorizable> authorizeRestricted = authorizable -> {
if (authorizable.isRestricted()) {
authorizeRestrictions(authorizer, authorizable);
// consider each processor. note - this request will not create new controller services so we do not need to check
// for if there are not restricted controller services. it will however, need to authorize the user has access
// to any referenced services and this is done within authorizeSnippetUsage above.
// Also ensure that user has READ permissions to the Parameter Contexts in order to copy them.
for (final ProcessGroupAuthorizable groupAuthorizable : snippet.getSelectedProcessGroups()) {
final ParameterContext parameterContext = groupAuthorizable.getProcessGroup().getParameterContext();
if (parameterContext != null) {
parameterContext.authorize(authorizer, RequestAction.READ, user);
for (final ProcessGroupAuthorizable encapsulatedGroupAuth : groupAuthorizable.getEncapsulatedProcessGroups()) {
final ParameterContext encapsulatedGroupParameterContext = encapsulatedGroupAuth.getProcessGroup().getParameterContext();
if (encapsulatedGroupParameterContext != null) {
encapsulatedGroupParameterContext.authorize(authorizer, RequestAction.READ, user);
copySnippetRequestEntity -> {
// copy the specified snippet
final FlowEntity flowEntity = serviceFacade.copySnippet(
groupId, copySnippetRequestEntity.getSnippetId(), copySnippetRequestEntity.getOriginX(), copySnippetRequestEntity.getOriginY(), getIdGenerationSeed().orElse(null));
// get the snippet
final FlowDTO flow = flowEntity.getFlow();
// prune response as necessary
for (ProcessGroupEntity childGroupEntity : flow.getProcessGroups()) {
// create the response entity
// generate the response
return generateCreatedResponse(getAbsolutePath(), flowEntity).build();
// -----------------
// template instance
// -----------------
* Discovers the compatible bundle details for the components in the specified snippet.
* @param snippet the snippet
private void discoverCompatibleBundles(final FlowSnippetDTO snippet) {
if (snippet.getProcessors() != null) {
snippet.getProcessors().forEach(processor -> {
final BundleCoordinate coordinate = serviceFacade.getCompatibleBundle(processor.getType(), processor.getBundle());
processor.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion()));
if (snippet.getControllerServices() != null) {
snippet.getControllerServices().forEach(controllerService -> {
final BundleCoordinate coordinate = serviceFacade.getCompatibleBundle(controllerService.getType(), controllerService.getBundle());
controllerService.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion()));
if (snippet.getProcessGroups() != null) {
snippet.getProcessGroups().forEach(processGroup -> {
* Instantiates the specified template within this ProcessGroup. The template instance that is instantiated cannot be referenced at a later time, therefore there is no
* corresponding URI. Instead the request URI is returned.
* <p>
* Alternatively, we could have performed a PUT request. However, PUT requests are supposed to be idempotent and this endpoint is certainly not.
* @param httpServletRequest request
* @param groupId The group id
* @param requestInstantiateTemplateRequestEntity The instantiate template request
* @return A flowEntity.
value = "Instantiates a template",
response = FlowEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}"),
@Authorization(value = "Read - /templates/{uuid}"),
@Authorization(value = "Write - if the template contains any restricted components - /restricted-components")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response instantiateTemplate(
@Context HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") String groupId,
value = "The instantiate template request.",
required = true
) InstantiateTemplateRequestEntity requestInstantiateTemplateRequestEntity) {
// ensure the position has been specified
if (requestInstantiateTemplateRequestEntity == null || requestInstantiateTemplateRequestEntity.getOriginX() == null || requestInstantiateTemplateRequestEntity.getOriginY() == null) {
throw new IllegalArgumentException("The origin position (x, y) must be specified.");
// ensure the template id was provided
if (requestInstantiateTemplateRequestEntity.getTemplateId() == null) {
throw new IllegalArgumentException("The template id must be specified.");
// ensure the template encoding version is valid
if (requestInstantiateTemplateRequestEntity.getEncodingVersion() != null) {
try {
} catch (final IllegalArgumentException e) {
throw new IllegalArgumentException("The template encoding version is not valid. The expected format is <number>.<number>");
// populate the encoding version if necessary
if (requestInstantiateTemplateRequestEntity.getEncodingVersion() == null) {
// if the encoding version is not specified, use the latest encoding version as these options were
// not available pre 1.x, will be overridden if populating from the underlying template below
// populate the component bundles if necessary
if (requestInstantiateTemplateRequestEntity.getSnippet() == null) {
// get the desired template in order to determine the supported bundles
final TemplateDTO requestedTemplate = serviceFacade.exportTemplate(requestInstantiateTemplateRequestEntity.getTemplateId());
final FlowSnippetDTO requestTemplateContents = requestedTemplate.getSnippet();
// determine the compatible bundles to use for each component in this template, this ensures the nodes in the cluster
// instantiate the components from the same bundles
// update the requested template as necessary - use the encoding version from the underlying template
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestInstantiateTemplateRequestEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// ensure write on the group
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
final Authorizable processGroup = groupAuthorizable.getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, user);
final Authorizable template = lookup.getTemplate(requestInstantiateTemplateRequestEntity.getTemplateId());
template.authorize(authorizer, RequestAction.READ, user);
// ensure read on the template
final TemplateContentsAuthorizable templateContents = lookup.getTemplateContents(requestInstantiateTemplateRequestEntity.getSnippet());
final Consumer<ComponentAuthorizable> authorizeRestricted = authorizable -> {
if (authorizable.isRestricted()) {
authorizeRestrictions(authorizer, authorizable);
// ensure restricted access if necessary
final Authorizable parameterContext = groupAuthorizable.getProcessGroup().getParameterContext();
if (parameterContext != null) {
AuthorizeParameterReference.authorizeParameterReferences(requestInstantiateTemplateRequestEntity.getSnippet(), authorizer, parameterContext, user);
() -> serviceFacade.verifyCanInstantiate(groupId, requestInstantiateTemplateRequestEntity.getSnippet()),
instantiateTemplateRequestEntity -> {
final FlowSnippetDTO snippet = instantiateTemplateRequestEntity.getSnippet();
// Check if the snippet contains any public port violating public port unique constraint with the current flow
// create the template and generate the json
final FlowEntity entity = serviceFacade.createTemplateInstance(groupId, instantiateTemplateRequestEntity.getOriginX(), instantiateTemplateRequestEntity.getOriginY(),
instantiateTemplateRequestEntity.getEncodingVersion(), snippet, getIdGenerationSeed().orElse(null));
final FlowDTO flowSnippet = entity.getFlow();
// prune response as necessary
for (ProcessGroupEntity childGroupEntity : flowSnippet.getProcessGroups()) {
// create the response entity
// generate the response
return generateCreatedResponse(getAbsolutePath(), entity).build();
private void verifyPublicPortUniqueness(FlowSnippetDTO snippet) {
snippet.getInputPorts().stream().filter(portDTO -> Boolean.TRUE.equals(portDTO.getAllowRemoteAccess()))
.forEach(portDTO -> {
try {
serviceFacade.verifyPublicInputPortUniqueness(portDTO.getId(), portDTO.getName());
} catch (IllegalStateException e) {
throw toPublicPortUniqueConstraintViolationException("input", portDTO);
snippet.getOutputPorts().stream().filter(portDTO -> Boolean.TRUE.equals(portDTO.getAllowRemoteAccess()))
.forEach(portDTO -> {
try {
serviceFacade.verifyPublicOutputPortUniqueness(portDTO.getId(), portDTO.getName());
} catch (IllegalStateException e) {
throw toPublicPortUniqueConstraintViolationException("output", portDTO);
snippet.getProcessGroups().forEach(processGroupDTO -> verifyPublicPortUniqueness(processGroupDTO.getContents()));
private IllegalStateException toPublicPortUniqueConstraintViolationException(final String portType, final PortDTO portDTO) {
return new IllegalStateException(String.format("The %s port [%s] named '%s' will violate the public port unique constraint." +
" Rename the existing port name, or the one in the template to instantiate the template in this flow.", portType, portDTO.getId(), portDTO.getName()));
// ---------
// templates
// ---------
private SnippetAuthorizable authorizeSnippetUsage(final AuthorizableLookup lookup, final String groupId, final String snippetId,
final boolean authorizeTransitiveServices, final boolean authorizeParameterReferences) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// ensure write access to the target process group
lookup.getProcessGroup(groupId).getAuthorizable().authorize(authorizer, RequestAction.WRITE, user);
// ensure read permission to every component in the snippet including referenced services
final SnippetAuthorizable snippet = lookup.getSnippet(snippetId);
authorizeSnippet(snippet, authorizer, lookup, RequestAction.READ, true, authorizeTransitiveServices, authorizeParameterReferences);
return snippet;
* Creates a new template based off of the specified template.
* @param httpServletRequest request
* @param requestCreateTemplateRequestEntity request to create the template
* @return A templateEntity
value = "Creates a template and discards the specified snippet.",
response = TemplateEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}"),
@Authorization(value = "Read - /{component-type}/{uuid} - For each component in the snippet and their descendant components")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createTemplate(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The create template request.",
required = true
) final CreateTemplateRequestEntity requestCreateTemplateRequestEntity) {
if (requestCreateTemplateRequestEntity == null || requestCreateTemplateRequestEntity.getSnippetId() == null) {
throw new IllegalArgumentException("The snippet identifier must be specified.");
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestCreateTemplateRequestEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
authorizeSnippetUsage(lookup, groupId, requestCreateTemplateRequestEntity.getSnippetId(), true, false);
() -> serviceFacade.verifyCanAddTemplate(groupId, requestCreateTemplateRequestEntity.getName()),
createTemplateRequestEntity -> {
// create the template and generate the json
final TemplateDTO template = serviceFacade.createTemplate(createTemplateRequestEntity.getName(), createTemplateRequestEntity.getDescription(),
createTemplateRequestEntity.getSnippetId(), groupId, getIdGenerationSeed());
// build the response entity
final TemplateEntity entity = new TemplateEntity();
// build the response
return generateCreatedResponse(URI.create(template.getUri()), entity).build();
* Imports the specified template.
* @param httpServletRequest request
* @param in The template stream
* @return A templateEntity or an errorResponse XML snippet.
* @throws InterruptedException if interrupted
value = "Uploads a template",
response = TemplateEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
name = "template",
value = "The binary content of the template file being uploaded.",
required = true,
type = "file",
paramType = "formData")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response uploadTemplate(
@Context final HttpServletRequest httpServletRequest,
@Context final UriInfo uriInfo,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.",
required = false
@FormDataParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
@FormDataParam("template") final InputStream in) throws InterruptedException {
// unmarshal the template
final TemplateDTO template;
try {
// TODO: Potentially refactor the template parsing to a service layer outside of the resource for web request handling
JAXBContext context = JAXBContext.newInstance(TemplateDTO.class);
Unmarshaller unmarshaller = context.createUnmarshaller();
XMLStreamReader xsr = XmlUtils.createSafeReader(in);
JAXBElement<TemplateDTO> templateElement = unmarshaller.unmarshal(xsr, TemplateDTO.class);
template = templateElement.getValue();
} catch (JAXBException jaxbe) {
logger.warn("An error occurred while parsing a template.", jaxbe);
String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"The specified template is not in a valid format.\"/>", Response.Status.BAD_REQUEST.getStatusCode());
return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build();
} catch (IllegalArgumentException iae) {
logger.warn("Unable to import template.", iae);
String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"%s\"/>", Response.Status.BAD_REQUEST.getStatusCode(), sanitizeErrorResponse(iae.getMessage()));
return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build();
} catch (Exception e) {
logger.warn("An error occurred while importing a template.", e);
String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"Unable to import the specified template: %s\"/>",
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), sanitizeErrorResponse(e.getMessage()));
return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build();
if (isDisconnectedFromCluster()) {
// build the response entity
TemplateEntity entity = new TemplateEntity();
if (isReplicateRequest()) {
// convert request accordingly
final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
uriBuilder.segment("process-groups", groupId, "templates", "import");
final URI importUri =;
final Map<String, String> headersToOverride = new HashMap<>();
headersToOverride.put("content-type", MediaType.APPLICATION_XML);
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
// to the cluster nodes themselves.
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
} else {
return getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
// otherwise import the template locally
return importTemplate(httpServletRequest, groupId, entity);
* Returns the sanitized error response which can safely be displayed on the error page.
* @param errorResponse the initial error response
* @return the HTML-escaped error response
private String sanitizeErrorResponse(String errorResponse) {
if (errorResponse == null || StringUtils.isEmpty(errorResponse)) {
return "";
return StringEscapeUtils.escapeHtml4(errorResponse);
* Imports the specified template.
* @param httpServletRequest request
* @param requestTemplateEntity A templateEntity.
* @return A templateEntity.
value = "Imports a template",
response = TemplateEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response importTemplate(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
final TemplateEntity requestTemplateEntity) {
// verify the template was specified
if (requestTemplateEntity == null || requestTemplateEntity.getTemplate() == null || requestTemplateEntity.getTemplate().getSnippet() == null) {
throw new IllegalArgumentException("Template details must be specified.");
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestTemplateEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
() -> serviceFacade.verifyCanAddTemplate(groupId, requestTemplateEntity.getTemplate().getName()),
templateEntity -> {
try {
// import the template
final TemplateDTO template = serviceFacade.importTemplate(templateEntity.getTemplate(), groupId, getIdGenerationSeed());
// build the response entity
TemplateEntity entity = new TemplateEntity();
// build the response
return generateCreatedResponse(URI.create(template.getUri()), entity).build();
} catch (IllegalArgumentException | IllegalStateException e) {"Unable to import template: " + e);
String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"%s\"/>", Response.Status.BAD_REQUEST.getStatusCode(), sanitizeErrorResponse(e.getMessage()));
return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build();
} catch (Exception e) {
logger.warn("An error occurred while importing a template.", e);
String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"Unable to import the specified template: %s\"/>",
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), sanitizeErrorResponse(e.getMessage()));
return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build();
// -------------------
// controller services
// -------------------
* Creates a new Controller Service.
* @param httpServletRequest request
* @param requestControllerServiceEntity A controllerServiceEntity.
* @return A controllerServiceEntity.
value = "Creates a new controller service",
response = ControllerServiceEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}"),
@Authorization(value = "Read - any referenced Controller Services - /controller-services/{uuid}"),
@Authorization(value = "Write - if the Controller Service is restricted - /restricted-components")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response createControllerService(
@Context final HttpServletRequest httpServletRequest,
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The controller service configuration details.",
required = true
) final ControllerServiceEntity requestControllerServiceEntity) {
if (requestControllerServiceEntity == null || requestControllerServiceEntity.getComponent() == null) {
throw new IllegalArgumentException("Controller service details must be specified.");
if (requestControllerServiceEntity.getRevision() == null
|| (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service.");
final ControllerServiceDTO requestControllerService = requestControllerServiceEntity.getComponent();
if (requestControllerService.getId() != null) {
throw new IllegalArgumentException("Controller service ID cannot be specified.");
if (StringUtils.isBlank(requestControllerService.getType())) {
throw new IllegalArgumentException("The type of controller service to create must be specified.");
if (requestControllerService.getParentGroupId() != null && !groupId.equals(requestControllerService.getParentGroupId())) {
throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s",
requestControllerService.getParentGroupId(), groupId));
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestControllerServiceEntity);
} else if (isDisconnectedFromCluster()) {
return withWriteLock(
lookup -> {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
final Authorizable processGroup = groupAuthorizable.getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, user);
final Authorizable parameterContext = groupAuthorizable.getProcessGroup().getParameterContext();
if (parameterContext != null) {
AuthorizeParameterReference.authorizeParameterReferences(requestControllerService.getProperties(), authorizer, parameterContext, user);
ComponentAuthorizable authorizable = null;
try {
authorizable = lookup.getConfigurableComponent(requestControllerService.getType(), requestControllerService.getBundle());
if (authorizable.isRestricted()) {
authorizeRestrictions(authorizer, authorizable);
if (requestControllerService.getProperties() != null) {
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(), authorizable, authorizer, lookup);
} finally {
if (authorizable != null) {
() -> serviceFacade.verifyCreateControllerService(requestControllerService),
controllerServiceEntity -> {
final ControllerServiceDTO controllerService = controllerServiceEntity.getComponent();
// set the processor id as appropriate
// create the controller service and generate the json
final Revision revision = getRevision(controllerServiceEntity, controllerService.getId());
final ControllerServiceEntity entity = serviceFacade.createControllerService(revision, groupId, controllerService);
// build the response
return generateCreatedResponse(URI.create(entity.getUri()), entity).build();
* Initiates the request to replace the Process Group with the given ID with the Process Group in the given import entity
* @param groupId The id of the process group to replace
* @param importEntity A request entity containing revision info and the process group to replace with
* @return A ProcessGroupReplaceRequestEntity.
value = "Initiate the Replace Request of a Process Group with the given ID",
response = ProcessGroupReplaceRequestEntity.class,
notes = "This will initiate the action of replacing a process group with the given process group. This can be a lengthy "
+ "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, "
+ "the endpoint will immediately return a ProcessGroupReplaceRequestEntity, and the process of replacing the flow will occur "
+ "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to "
+ "/process-groups/replace-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to "
+ "/process-groups/replace-requests/{requestId}. " + NON_GUARANTEED_ENDPOINT,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}"),
@Authorization(value = "Write - /process-groups/{uuid}"),
@Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"),
@Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"),
@Authorization(value = "Write - if the template contains any restricted components - /restricted-components"),
@Authorization(value = "Read - /parameter-contexts/{uuid} - For any Parameter Context that is referenced by a Property that is changed, added, or removed")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response initiateReplaceProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
@ApiParam(value = "The process group replace request entity", required = true) final ProcessGroupImportEntity importEntity) {
if (importEntity == null) {
throw new IllegalArgumentException("Process Group Import Entity is required");
// replacing a flow under version control is not permitted via import. Versioned flows have additional requirements to allow
// them only to be replaced by a different version of the same flow.
if (serviceFacade.isAnyProcessGroupUnderVersionControl(groupId)) {
throw new IllegalStateException("Cannot replace a Process Group via import while it or its descendants are under Version Control.");
final VersionedFlowSnapshot versionedFlowSnapshot = importEntity.getVersionedFlowSnapshot();
if (versionedFlowSnapshot == null) {
throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied");
// remove any registry-specific versioning content which could be present if the flow was exported from registry
return initiateFlowUpdate(groupId, importEntity, true, "replace-requests",
"/nifi-api/process-groups/" + groupId + "/flow-contents", importEntity::getVersionedFlowSnapshot);
* Recursively clear the registry info in the given versioned process group and all nested versioned process groups
* @param versionedProcessGroup the process group to sanitize
private void sanitizeRegistryInfo(final VersionedProcessGroup versionedProcessGroup) {
for (final VersionedProcessGroup innerVersionedProcessGroup : versionedProcessGroup.getProcessGroups()) {
* Uploads the specified versioned flow definition and adds it to a new process group.
* @param in The flow definition stream
* @return A processGroupEntity
* @throws IOException if there is an error during deserialization of the InputStream
value = "Uploads a versioned flow definition and creates a process group",
response = ProcessGroupEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response uploadProcessGroup(
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
value = "The process group name.",
required = true
@FormDataParam("groupName") final String groupName,
value = "The process group X position.",
required = true
@FormDataParam("positionX") final Double positionX,
value = "The process group Y position.",
required = true
@FormDataParam("positionY") final Double positionY,
value = "The client id.",
required = true
@FormDataParam("clientId") final String clientId,
value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.",
required = false
@FormDataParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
@FormDataParam("file") final InputStream in) throws InterruptedException {
// ensure the group name is specified
if (StringUtils.isBlank(groupName)) {
throw new IllegalArgumentException("The process group name is required.");
if (StringUtils.isBlank(groupId)) {
throw new IllegalArgumentException("The parent process group id is required");
if (positionX == null) {
throw new IllegalArgumentException("The x coordinate of the proposed position must be specified.");
if (positionY == null) {
throw new IllegalArgumentException("The y coordinate of the proposed position must be specified.");
if (StringUtils.isBlank(clientId)) {
throw new IllegalArgumentException("The client id must be specified");
// deserialize InputStream to a VersionedFlowSnapshot
VersionedFlowSnapshot deserializedSnapshot;
try {
deserializedSnapshot = MAPPER.readValue(in, VersionedFlowSnapshot.class);
} catch (IOException e) {
logger.warn("Deserialization of uploaded JSON failed", e);
throw new IllegalArgumentException("Deserialization of uploaded JSON failed", e);
// clear Registry info
// resolve Bundle info
// if there are any Controller Services referenced that are inherited from the parent group,
// resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(deserializedSnapshot, groupId, NiFiUserUtils.getNiFiUser());
if (isDisconnectedFromCluster()) {
// create a PositionDTO
final PositionDTO positionDTO = new PositionDTO();
// create a RevisionDTO
RevisionDTO revisionDTO = new RevisionDTO();
revisionDTO.setVersion((long) 0);
// build the response entity for a replicate request
ProcessGroupUploadEntity pgUploadEntity = new ProcessGroupUploadEntity();
// replicate the request
if (isReplicateRequest()) {
// convert request accordingly
final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
uriBuilder.segment("process-groups", groupId, "process-groups", "import");
final URI importUri =;
final Map<String, String> headersToOverride = new HashMap<>();
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
// to the cluster nodes themselves.
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
return getRequestReplicator().replicate(HttpMethod.POST, importUri, pgUploadEntity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
} else {
return getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.POST, importUri, pgUploadEntity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
// otherwise import the process group locally
return importProcessGroup(groupId, pgUploadEntity);
* Imports the specified process group.
* @param processGroupUploadEntity A ProcessGroupUploadEntity.
* @return A processGroupEntity.
value = "Imports a specified process group",
response = ProcessGroupEntity.class,
authorizations = {
@Authorization(value = "Write - /process-groups/{uuid}")
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response importProcessGroup(
value = "The process group id.",
required = true
@PathParam("id") final String groupId,
final ProcessGroupUploadEntity processGroupUploadEntity) {
// verify the process group was specified
if (processGroupUploadEntity == null || processGroupUploadEntity.getFlowSnapshot() == null) {
throw new IllegalArgumentException("Process group details must be specified.");
final VersionedFlowSnapshot versionedFlowSnapshot = processGroupUploadEntity.getFlowSnapshot();
// clear Registry info
// resolve Bundle info
// if there are any Controller Services referenced that are inherited from the parent group,
// resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(versionedFlowSnapshot, groupId, NiFiUserUtils.getNiFiUser());
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, processGroupUploadEntity);
} else if (isDisconnectedFromCluster()) {
// create a new ProcessGroupEntity
final ProcessGroupEntity newProcessGroupEntity = createProcessGroupEntity(groupId, processGroupUploadEntity.getGroupName(), processGroupUploadEntity.getPositionDTO(), versionedFlowSnapshot);
return withWriteLock(
lookup -> authorizeAccess(groupId, newProcessGroupEntity, lookup),
() -> {
final VersionedFlowSnapshot newVersionedFlowSnapshot = newProcessGroupEntity.getVersionedFlowSnapshot();
if (newVersionedFlowSnapshot != null) {
processGroupEntity -> {
final ProcessGroupDTO processGroup = processGroupEntity.getComponent();
// set the processor id as appropriate
// get the versioned flow
final VersionedFlowSnapshot flowSnapshot = processGroupEntity.getVersionedFlowSnapshot();
// create the process group contents
final Revision revision = new Revision((long) 0, processGroupUploadEntity.getRevisionDTO().getClientId(), processGroup.getId());
ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroup);
if (flowSnapshot != null) {
final RevisionDTO revisionDto = entity.getRevision();
final String newGroupId = entity.getComponent().getId();
final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId);
// We don't want the Process Group's position to be updated because we want to keep the position where the user
// placed the Process Group. We do not want to use the name of the Process Group that is in the Flow Contents.
// To accomplish this, we call updateProcessGroupContents() passing 'false' for the updateSettings flag, set
// the Process Group name, and null out the position.
entity = serviceFacade.updateProcessGroupContents(newGroupRevision, newGroupId, null, flowSnapshot,
getIdGenerationSeed().orElse(null), false, false, true);
// generate a 201 created response
String uri = entity.getUri();
return generateCreatedResponse(URI.create(uri), entity).build();
* Replace the Process Group contents with the given ID with the specified Process Group contents.
* This is the endpoint used in a cluster update replication scenario.
* @param groupId The id of the process group to replace
* @param importEntity A request entity containing revision info and the process group to replace with
* @return A ProcessGroupImportEntity.
value = "Replace Process Group contents with the given ID with the specified Process Group contents",
response = ProcessGroupImportEntity.class,
notes = "This endpoint is used for replication within a cluster, when replacing a flow with a new flow. It expects that the flow being"
+ "replaced is not under version control and that the given snapshot will not modify any Processor that is currently running "
+ "or any Controller Service that is enabled. "
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}"),
@Authorization(value = "Write - /process-groups/{uuid}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response replaceProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
@ApiParam(value = "The process group replace request entity.", required = true) final ProcessGroupImportEntity importEntity) {
// Verify the request
if (importEntity == null) {
throw new IllegalArgumentException("Process Group Import Entity is required");
final RevisionDTO revisionDto = importEntity.getProcessGroupRevision();
if (revisionDto == null) {
throw new IllegalArgumentException("Process Group Revision must be specified.");
final VersionedFlowSnapshot requestFlowSnapshot = importEntity.getVersionedFlowSnapshot();
if (requestFlowSnapshot == null) {
throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied.");
// Perform the request
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, importEntity);
} else if (isDisconnectedFromCluster()) {
final Revision requestRevision = getRevision(importEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
lookup -> {
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
final Authorizable processGroup = groupAuthorizable.getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
() -> {
// We do not enforce that the Process Group is 'not dirty' because at this point,
// the client has explicitly indicated the dataflow that the Process Group should
// provide and provided the Revision to ensure that they have the most up-to-date
// view of the Process Group.
serviceFacade.verifyCanUpdate(groupId, requestFlowSnapshot, true, false);
(revision, entity) -> {
final ProcessGroupEntity updatedGroup =
performUpdateFlow(groupId, revision, importEntity, entity.getVersionedFlowSnapshot(),
getIdGenerationSeed().orElse(null), false, true);
// response to replication request is an entity with revision info but no versioned flow snapshot
final ProcessGroupImportEntity responseEntity = new ProcessGroupImportEntity();
return generateOkResponse(responseEntity).build();
* Retrieve a request to replace a Process Group by request ID.
* @param replaceRequestId The ID of the replace request
* @return A ProcessGroupReplaceRequestEntity.
value = "Returns the Replace Request with the given ID",
response = ProcessGroupReplaceRequestEntity.class,
notes = "Returns the Replace Request with the given ID. Once a Replace Request has been created by performing a POST to /process-groups/{id}/replace-requests, "
+ "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the "
+ "current state of the request, and any failures. "
authorizations = {
@Authorization(value = "Only the user that submitted the request can get it")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response getReplaceProcessGroupRequest(
@ApiParam("The ID of the Replace Request") @PathParam("id") final String replaceRequestId) {
return retrieveFlowUpdateRequest("replace-requests", replaceRequestId);
value = "Deletes the Replace Request with the given ID",
response = ProcessGroupReplaceRequestEntity.class,
notes = "Deletes the Replace Request with the given ID. After a request is created via a POST to /process-groups/{id}/replace-requests, it is expected "
+ "that the client will properly clean up the request by DELETE'ing it, once the Replace process has completed. If the request is deleted before the request "
+ "completes, then the Replace request will finish the step that it is currently performing and then will cancel any subsequent steps. "
authorizations = {
@Authorization(value = "Only the user that submitted the request can remove it")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
public Response deleteReplaceProcessGroupRequest(
@ApiParam(value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", required = false)
@QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
@ApiParam("The ID of the Update Request") @PathParam("id") final String replaceRequestId) {
return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue());
* Perform actual flow update of the specified flow. This is used for the initial flow update and replication updates.
protected ProcessGroupEntity performUpdateFlow(final String groupId, final Revision revision, final ProcessGroupImportEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot, final String idGenerationSeed,
final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) {"Replacing Process Group with ID {} with imported Process Group with ID {}", groupId, flowSnapshot.getFlowContents().getIdentifier());
// Update Process Group to the new flow (including name) and update variable registry with any Variables that were added or removed
return serviceFacade.updateProcessGroupContents(revision, groupId, null, flowSnapshot, idGenerationSeed, verifyNotModified,
true, updateDescendantVersionedFlows);
* Create the entity that is used for update flow replication. The initial replace request entity can be re-used for the replication request.
protected Entity createReplicateUpdateFlowEntity(final Revision revision, final ProcessGroupImportEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot) {
return requestEntity;
* Create the entity that captures the status and result of a replace request
* @return a new instance of a ProcessGroupReplaceRequestEntity
protected ProcessGroupReplaceRequestEntity createUpdateRequestEntity() {
return new ProcessGroupReplaceRequestEntity();
* Finalize a completed update request for an existing replace request. This is used when retrieving and deleting a replace request.
* A completed request will contain the updated VersionedFlowSnapshot
* @param requestEntity the request entity to finalize
protected void finalizeCompletedUpdateRequest(final ProcessGroupReplaceRequestEntity requestEntity) {
final ProcessGroupReplaceRequestDTO updateRequestDto = requestEntity.getRequest();
if (updateRequestDto.isComplete()) {
final VersionedFlowSnapshot versionedFlowSnapshot =
private static class UpdateVariableRegistryRequestWrapper extends Entity {
private final Set<AffectedComponentEntity> allAffectedComponents;
private final List<AffectedComponentDTO> activeAffectedProcessors;
private final List<AffectedComponentDTO> activeAffectedServices;
private final VariableRegistryEntity variableRegistryEntity;
public UpdateVariableRegistryRequestWrapper(final Set<AffectedComponentEntity> allAffectedComponents, final List<AffectedComponentDTO> activeAffectedProcessors,
final List<AffectedComponentDTO> activeAffectedServices, VariableRegistryEntity variableRegistryEntity) {
this.allAffectedComponents = allAffectedComponents;
this.activeAffectedProcessors = activeAffectedProcessors;
this.activeAffectedServices = activeAffectedServices;
this.variableRegistryEntity = variableRegistryEntity;
public Set<AffectedComponentEntity> getAllAffectedComponents() {
return allAffectedComponents;
public List<AffectedComponentDTO> getActiveAffectedProcessors() {
return activeAffectedProcessors;
public List<AffectedComponentDTO> getActiveAffectedServices() {
return activeAffectedServices;
public VariableRegistryEntity getVariableRegistryEntity() {
return variableRegistryEntity;
* Creates a new ProcessGroupEntity with the specified VersionedFlowSnapshot.
* @param groupId the group id string
* @param groupName the process group name string
* @param positionDTO the process group PositionDTO
* @param deserializedSnapshot the deserialized snapshot
* @return a new ProcessGroupEntity
private ProcessGroupEntity createProcessGroupEntity(
String groupId, String groupName, PositionDTO positionDTO, VersionedFlowSnapshot deserializedSnapshot) {
final ProcessGroupEntity processGroupEntity = new ProcessGroupEntity();
// create a ProcessGroupDTO
final ProcessGroupDTO processGroupDTO = new ProcessGroupDTO();
// set the ProcessGroupEntity position
return processGroupEntity;
* Authorizes access to a Parameter Context and RestrictedComponents resource.
* @param groupId the group id string
* @param processGroupEntity the ProcessGroupEntity
* @param lookup the lookup
private void authorizeAccess(String groupId, ProcessGroupEntity processGroupEntity, AuthorizableLookup lookup) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, user);
// if request specifies a Parameter Context, need to authorize that user has READ policy for the Parameter Context.
final ParameterContextReferenceEntity referencedParamContext = processGroupEntity.getComponent().getParameterContext();
if (referencedParamContext != null && referencedParamContext.getId() != null) {
lookup.getParameterContext(referencedParamContext.getId()).authorize(authorizer, RequestAction.READ, user);
// if any of the components is a Restricted Component, then we must authorize the user
// for write access to the RestrictedComponents resource
final VersionedFlowSnapshot versionedFlowSnapshot = processGroupEntity.getVersionedFlowSnapshot();
if (versionedFlowSnapshot != null) {
final Set<ConfigurableComponent> restrictedComponents = FlowRegistryUtils.getRestrictedComponents(versionedFlowSnapshot.getFlowContents(), serviceFacade);
restrictedComponents.forEach(restrictedComponent -> {
final ComponentAuthorizable restrictedComponentAuthorizable = lookup.getConfigurableComponent(restrictedComponent);
authorizeRestrictions(authorizer, restrictedComponentAuthorizable);
final Map<String, VersionedParameterContext> parameterContexts = versionedFlowSnapshot.getParameterContexts();
if (parameterContexts != null) {
parameterContexts.values().forEach(context -> AuthorizeParameterReference.authorizeParameterContextAddition(context, serviceFacade, authorizer, lookup, user));
// setters
public void setProcessorResource(ProcessorResource processorResource) {
this.processorResource = processorResource;
public void setInputPortResource(InputPortResource inputPortResource) {
this.inputPortResource = inputPortResource;
public void setOutputPortResource(OutputPortResource outputPortResource) {
this.outputPortResource = outputPortResource;
public void setFunnelResource(FunnelResource funnelResource) {
this.funnelResource = funnelResource;
public void setLabelResource(LabelResource labelResource) {
this.labelResource = labelResource;
public void setRemoteProcessGroupResource(RemoteProcessGroupResource remoteProcessGroupResource) {
this.remoteProcessGroupResource = remoteProcessGroupResource;
public void setConnectionResource(ConnectionResource connectionResource) {
this.connectionResource = connectionResource;
public void setTemplateResource(TemplateResource templateResource) {
this.templateResource = templateResource;
public void setControllerServiceResource(ControllerServiceResource controllerServiceResource) {
this.controllerServiceResource = controllerServiceResource;
private static class DropEntity extends Entity {
final String entityId;
final String dropRequestId;
public DropEntity(String entityId, String dropRequestId) {
this.entityId = entityId;
this.dropRequestId = dropRequestId;
public String getEntityId() {
return entityId;
public String getDropRequestId() {
return dropRequestId;