blob: 35a7c8183be13ede0201091dbe3d8e7a99a9c78d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.OperationAuthorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleDetails;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.AboutDTO;
import org.apache.nifi.web.api.dto.BannerDTO;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.ClusterSummaryDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.entity.AboutEntity;
import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.BannerEntity;
import org.apache.nifi.web.api.entity.BucketEntity;
import org.apache.nifi.web.api.entity.BucketsEntity;
import org.apache.nifi.web.api.entity.BulletinBoardEntity;
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
import org.apache.nifi.web.api.entity.ComponentHistoryEntity;
import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.ControllerStatusEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity;
import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
import org.apache.nifi.web.api.entity.HistoryEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ParameterContextsEntity;
import org.apache.nifi.web.api.entity.PortStatusEntity;
import org.apache.nifi.web.api.entity.PrioritizerTypesEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
import org.apache.nifi.web.api.entity.RegistriesEntity;
import org.apache.nifi.web.api.entity.RegistryClientsEntity;
import org.apache.nifi.web.api.entity.RegistryEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity;
import org.apache.nifi.web.api.entity.ReportingTasksEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.apache.nifi.web.api.entity.SearchResultsEntity;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import org.apache.nifi.web.api.entity.TemplateEntity;
import org.apache.nifi.web.api.entity.TemplatesEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity;
import org.apache.nifi.web.api.entity.VersionedFlowsEntity;
import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
import org.apache.nifi.web.api.request.DateTimeParameter;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.nifi.web.api.entity.ScheduleComponentsEntity.STATE_DISABLED;
import static org.apache.nifi.web.api.entity.ScheduleComponentsEntity.STATE_ENABLED;
/**
* RESTful endpoint for managing a Flow.
*/
@Path("/flow")
@Api(
value = "/flow",
description = "Endpoint for accessing the flow structure and component status."
)
public class FlowResource extends ApplicationResource {
private static final String RECURSIVE = "false";
private NiFiServiceFacade serviceFacade;
private Authorizer authorizer;
private ProcessorResource processorResource;
private InputPortResource inputPortResource;
private OutputPortResource outputPortResource;
private FunnelResource funnelResource;
private LabelResource labelResource;
private RemoteProcessGroupResource remoteProcessGroupResource;
private ConnectionResource connectionResource;
private TemplateResource templateResource;
private ProcessGroupResource processGroupResource;
private ControllerServiceResource controllerServiceResource;
private ReportingTaskResource reportingTaskResource;
public FlowResource() {
super();
}
/**
* Populates the remaining fields in the specified process group.
*
* @param flow group
* @return group dto
*/
private ProcessGroupFlowDTO populateRemainingFlowContent(ProcessGroupFlowDTO flow) {
FlowDTO flowStructure = flow.getFlow();
// populate the remaining fields for the processors, connections, process group refs, remote process groups, and labels if appropriate
if (flowStructure != null) {
populateRemainingFlowStructure(flowStructure);
}
// set the process group uri
flow.setUri(generateResourceUri("flow", "process-groups", flow.getId()));
return flow;
}
/**
* Populates the remaining content of the specified snippet.
*/
private FlowDTO populateRemainingFlowStructure(FlowDTO flowStructure) {
processorResource.populateRemainingProcessorEntitiesContent(flowStructure.getProcessors());
connectionResource.populateRemainingConnectionEntitiesContent(flowStructure.getConnections());
inputPortResource.populateRemainingInputPortEntitiesContent(flowStructure.getInputPorts());
outputPortResource.populateRemainingOutputPortEntitiesContent(flowStructure.getOutputPorts());
remoteProcessGroupResource.populateRemainingRemoteProcessGroupEntitiesContent(flowStructure.getRemoteProcessGroups());
funnelResource.populateRemainingFunnelEntitiesContent(flowStructure.getFunnels());
labelResource.populateRemainingLabelEntitiesContent(flowStructure.getLabels());
processGroupResource.populateRemainingProcessGroupEntitiesContent(flowStructure.getProcessGroups());
// go through each process group child and populate its uri
for (final ProcessGroupEntity processGroupEntity : flowStructure.getProcessGroups()) {
final ProcessGroupDTO processGroup = processGroupEntity.getComponent();
if (processGroup != null) {
processGroup.setContents(null);
}
}
return flowStructure;
}
/**
* Authorizes access to the flow.
*/
private void authorizeFlow() {
serviceFacade.authorizeAccess(lookup -> {
final Authorizable flow = lookup.getFlow();
flow.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});
}
// ----
// flow
// ----
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.TEXT_PLAIN)
@Path("client-id")
@ApiOperation(
value = "Generates a client id.",
response = String.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response generateClientId() {
authorizeFlow();
return generateOkResponse(generateUuid()).build();
}
/**
* Retrieves the configuration for the flow.
*
* @return A flowConfigurationEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("config")
@ApiOperation(
value = "Retrieves the configuration for this NiFi flow",
response = FlowConfigurationEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getFlowConfig() {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
final FlowConfigurationEntity entity = serviceFacade.getFlowConfiguration();
return generateOkResponse(entity).build();
}
/**
* Retrieves the identity of the user making the request.
*
* @return An identityEntity
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("current-user")
@ApiOperation(
value = "Retrieves the user identity of the user making the request",
response = CurrentUserEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
public Response getCurrentUser() {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// note that the cluster manager will handle this request directly
final NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user == null) {
throw new WebApplicationException(new Throwable("Unable to access details for current user."));
}
// create the response entity
final CurrentUserEntity entity = serviceFacade.getCurrentUser();
// generate the response
return generateOkResponse(entity).build();
}
/**
* Retrieves the contents of the specified group.
*
* @param groupId The id of the process group.
* @return A processGroupEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("process-groups/{id}")
@ApiOperation(
value = "Gets a process group",
response = ProcessGroupFlowEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
},
notes = "If the uiOnly query parameter is provided with a value of true, the returned entity may only contain fields that are necessary for rendering the NiFi User Interface. As such, " +
"the selected fields may change at any time, even during incremental releases, without warning. As a result, this parameter should not be provided by any client other than the UI."
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getFlow(
@ApiParam(
value = "The process group id.",
required = false
)
@PathParam("id") final String groupId,
@QueryParam("uiOnly") @DefaultValue("false") final boolean uiOnly) {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get this process group flow
final ProcessGroupFlowEntity entity = serviceFacade.getProcessGroupFlow(groupId, uiOnly);
populateRemainingFlowContent(entity.getProcessGroupFlow());
return generateOkResponse(entity).build();
}
/**
* Retrieves the metrics of the entire flow.
*
* @return A flowMetricsEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.WILDCARD)
@Path("metrics/{producer}")
@ApiOperation(
value = "Gets all metrics for the flow from a particular node",
response = StreamingOutput.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getFlowMetrics(
@ApiParam(
value = "The producer for flow file metrics. Each producer may have its own output format.",
required = true
)
@PathParam("producer") final String producer) throws InterruptedException {
authorizeFlow();
if ("prometheus".equalsIgnoreCase(producer)) {
// get this process group flow
final Collection<CollectorRegistry> allRegistries = serviceFacade.generateFlowMetrics();
// generate a streaming response
final StreamingOutput response = output -> {
Writer writer = new BufferedWriter(new OutputStreamWriter(output));
for (CollectorRegistry collectorRegistry : allRegistries) {
TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
// flush the response
output.flush();
}
writer.flush();
writer.close();
};
return generateOkResponse(response)
.type(MediaType.TEXT_PLAIN_TYPE)
.build();
} else {
throw new ResourceNotFoundException("The specified producer is missing or invalid.");
}
}
// -------------------
// controller services
// -------------------
/**
* Retrieves controller services for reporting tasks in this NiFi.
*
* @return A controllerServicesEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("controller/controller-services")
@ApiOperation(
value = "Gets controller services for reporting tasks",
response = ControllerServicesEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getControllerServicesFromController() {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get all the controller services
final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(null, false, false);
controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices);
// create the response entity
final ControllerServicesEntity entity = new ControllerServicesEntity();
entity.setCurrentTime(new Date());
entity.setControllerServices(controllerServices);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Retrieves all the of controller services in this NiFi.
*
* @return A controllerServicesEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("process-groups/{id}/controller-services")
@ApiOperation(
value = "Gets all controller services",
response = ControllerServicesEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getControllerServicesFromGroup(
@ApiParam(value = "The process group id.", required = true) @PathParam("id") String groupId,
@ApiParam("Whether or not to include parent/ancestory process groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean includeAncestorGroups,
@ApiParam("Whether or not to include descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups
) throws InterruptedException {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get all the controller services
final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices);
// create the response entity
final ControllerServicesEntity entity = new ControllerServicesEntity();
entity.setCurrentTime(new Date());
entity.setControllerServices(controllerServices);
// generate the response
return generateOkResponse(entity).build();
}
// ---------------
// reporting-tasks
// ---------------
/**
* Retrieves all the of reporting tasks in this NiFi.
*
* @return A reportingTasksEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("reporting-tasks")
@ApiOperation(
value = "Gets all reporting tasks",
response = ReportingTasksEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getReportingTasks() {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get all the reporting tasks
final Set<ReportingTaskEntity> reportingTasks = serviceFacade.getReportingTasks();
reportingTaskResource.populateRemainingReportingTaskEntitiesContent(reportingTasks);
// create the response entity
final ReportingTasksEntity entity = new ReportingTasksEntity();
entity.setReportingTasks(reportingTasks);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Updates the specified process group.
*
* @param httpServletRequest request
* @param id The id of the process group.
* @param requestScheduleComponentsEntity A scheduleComponentsEntity.
* @return A processGroupEntity.
*/
@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("process-groups/{id}")
@ApiOperation(
value = "Schedule or unschedule components in the specified Process Group.",
response = ScheduleComponentsEntity.class,
authorizations = {
@Authorization(value = "Read - /flow"),
@Authorization(value = "Write - /{component-type}/{uuid} or /operation/{component-type}/{uuid} - For every component being scheduled/unscheduled")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response scheduleComponents(
@Context HttpServletRequest httpServletRequest,
@ApiParam(
value = "The process group id.",
required = true
)
@PathParam("id") String id,
@ApiParam(
value = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.",
required = true
) final ScheduleComponentsEntity requestScheduleComponentsEntity) {
if (requestScheduleComponentsEntity == null) {
throw new IllegalArgumentException("Schedule Component must be specified.");
}
// ensure the same id is being used
if (!id.equals(requestScheduleComponentsEntity.getId())) {
throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+ "not equal the process group id of the requested resource (%s).", requestScheduleComponentsEntity.getId(), id));
}
final ScheduledState state;
if (requestScheduleComponentsEntity.getState() == null) {
throw new IllegalArgumentException("The scheduled state must be specified.");
} else {
if (requestScheduleComponentsEntity.getState().equals(STATE_ENABLED)) {
state = ScheduledState.STOPPED;
} else {
try {
state = ScheduledState.valueOf(requestScheduleComponentsEntity.getState());
} catch (final IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].",
StringUtils.join(Stream.of(ScheduledState.RUNNING, ScheduledState.STOPPED, STATE_ENABLED, ScheduledState.DISABLED), ", ")));
}
}
}
// ensure its a supported scheduled state
if (ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state)) {
throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].",
StringUtils.join(Stream.of(ScheduledState.RUNNING, ScheduledState.STOPPED, STATE_ENABLED, ScheduledState.DISABLED), ", ")));
}
// if the components are not specified, gather all components and their current revision
if (requestScheduleComponentsEntity.getComponents() == null) {
final Supplier<Predicate<ProcessorNode>> getProcessorFilter = () -> {
if (ScheduledState.RUNNING.equals(state)) {
return ProcessGroup.START_PROCESSORS_FILTER;
} else if (ScheduledState.STOPPED.equals(state)) {
if (requestScheduleComponentsEntity.getState().equals(STATE_ENABLED)) {
return ProcessGroup.ENABLE_PROCESSORS_FILTER;
} else {
return ProcessGroup.STOP_PROCESSORS_FILTER;
}
} else {
return ProcessGroup.DISABLE_PROCESSORS_FILTER;
}
};
final Supplier<Predicate<Port>> getPortFilter = () -> {
if (ScheduledState.RUNNING.equals(state)) {
return ProcessGroup.START_PORTS_FILTER;
} else if (ScheduledState.STOPPED.equals(state)) {
if (requestScheduleComponentsEntity.getState().equals(STATE_ENABLED)) {
return ProcessGroup.ENABLE_PORTS_FILTER;
} else {
return ProcessGroup.STOP_PORTS_FILTER;
}
} else {
return ProcessGroup.DISABLE_PORTS_FILTER;
}
};
// get the current revisions for the components being updated
final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> {
final Set<String> componentIds = new HashSet<>();
// ensure authorized for each processor we will attempt to schedule
group.findAllProcessors().stream()
.filter(getProcessorFilter.get())
.filter(processor -> OperationAuthorizable.isOperationAuthorized(processor, authorizer, NiFiUserUtils.getNiFiUser()))
.forEach(processor -> {
componentIds.add(processor.getIdentifier());
});
// ensure authorized for each input port we will attempt to schedule
group.findAllInputPorts().stream()
.filter(getPortFilter.get())
.filter(inputPort -> OperationAuthorizable.isOperationAuthorized(inputPort, authorizer, NiFiUserUtils.getNiFiUser()))
.forEach(inputPort -> {
componentIds.add(inputPort.getIdentifier());
});
// ensure authorized for each output port we will attempt to schedule
group.findAllOutputPorts().stream()
.filter(getPortFilter.get())
.filter(outputPort -> OperationAuthorizable.isOperationAuthorized(outputPort, authorizer, NiFiUserUtils.getNiFiUser()))
.forEach(outputPort -> {
componentIds.add(outputPort.getIdentifier());
});
return componentIds;
});
// build the component mapping
final Map<String, RevisionDTO> componentsToSchedule = new HashMap<>();
revisions.forEach(revision -> {
final RevisionDTO dto = new RevisionDTO();
dto.setClientId(revision.getClientId());
dto.setVersion(revision.getVersion());
componentsToSchedule.put(revision.getComponentId(), dto);
});
// set the components and their current revision
requestScheduleComponentsEntity.setComponents(componentsToSchedule);
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestScheduleComponentsEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestScheduleComponentsEntity.isDisconnectedNodeAcknowledged());
}
final Map<String, RevisionDTO> requestComponentsToSchedule = requestScheduleComponentsEntity.getComponents();
final Map<String, Revision> requestComponentRevisions =
requestComponentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
final Set<Revision> requestRevisions = new HashSet<>(requestComponentRevisions.values());
return withWriteLock(
serviceFacade,
requestScheduleComponentsEntity,
requestRevisions,
lookup -> {
// ensure access to the flow
authorizeFlow();
// ensure access to every component being scheduled
requestComponentsToSchedule.keySet().forEach(componentId -> {
final Authorizable connectable = lookup.getLocalConnectable(componentId);
OperationAuthorizable.authorizeOperation(connectable, authorizer, NiFiUserUtils.getNiFiUser());
});
},
() -> {
if (STATE_ENABLED.equals(requestScheduleComponentsEntity.getState()) || STATE_DISABLED.equals(requestScheduleComponentsEntity.getState())) {
serviceFacade.verifyEnableComponents(id, state, requestComponentRevisions.keySet());
} else {
serviceFacade.verifyScheduleComponents(id, state, requestComponentRevisions.keySet());
}
},
(revisions, scheduleComponentsEntity) -> {
final ScheduledState scheduledState;
if (STATE_ENABLED.equals(scheduleComponentsEntity.getState())) {
scheduledState = ScheduledState.STOPPED;
} else {
scheduledState = ScheduledState.valueOf(scheduleComponentsEntity.getState());
}
final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents();
final Map<String, Revision> componentRevisions =
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
// update the process group
final ScheduleComponentsEntity entity;
if (STATE_ENABLED.equals(scheduleComponentsEntity.getState()) || STATE_DISABLED.equals(scheduleComponentsEntity.getState())) {
entity = serviceFacade.enableComponents(id, scheduledState, componentRevisions);
} else {
entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
}
return generateOkResponse(entity).build();
}
);
}
@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("process-groups/{id}/controller-services")
@ApiOperation(value = "Enable or disable Controller Services in the specified Process Group.",
response = ActivateControllerServicesEntity.class,
authorizations = {
@Authorization(value = "Read - /flow"),
@Authorization(value = "Write - /{component-type}/{uuid} or /operation/{component-type}/{uuid} - For every service being enabled/disabled")
})
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response activateControllerServices(
@Context HttpServletRequest httpServletRequest,
@ApiParam(value = "The process group id.", required = true)
@PathParam("id") String id,
@ApiParam(value = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.", required = true)
final ActivateControllerServicesEntity requestEntity) {
if (requestEntity == null) {
throw new IllegalArgumentException("Controller service must be specified.");
}
// ensure the same id is being used
if (!id.equals(requestEntity.getId())) {
throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+ "not equal the process group id of the requested resource (%s).", requestEntity.getId(), id));
}
final ControllerServiceState desiredState;
if (requestEntity.getState() == null) {
throw new IllegalArgumentException("The controller service state must be specified.");
} else {
try {
desiredState = ControllerServiceState.valueOf(requestEntity.getState());
} catch (final IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format("The controller service state must be one of [%s].",
StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, ControllerServiceState.DISABLED), ", ")));
}
}
// ensure its a supported scheduled state
if (ControllerServiceState.DISABLING.equals(desiredState) || ControllerServiceState.ENABLING.equals(desiredState)) {
throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].",
StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, ControllerServiceState.DISABLED), ", ")));
}
// if the components are not specified, gather all components and their current revision
if (requestEntity.getComponents() == null) {
// get the current revisions for the components being updated
final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> {
final Set<String> componentIds = new HashSet<>();
final Predicate<ControllerServiceNode> filter;
if (ControllerServiceState.ENABLED.equals(desiredState)) {
filter = service -> !service.isActive();
} else {
filter = ControllerServiceNode::isActive;
}
group.findAllControllerServices().stream()
.filter(filter)
.filter(service -> OperationAuthorizable.isOperationAuthorized(service, authorizer, NiFiUserUtils.getNiFiUser()))
.forEach(service -> componentIds.add(service.getIdentifier()));
return componentIds;
});
// build the component mapping
final Map<String, RevisionDTO> componentsToSchedule = new HashMap<>();
revisions.forEach(revision -> {
final RevisionDTO dto = new RevisionDTO();
dto.setClientId(revision.getClientId());
dto.setVersion(revision.getVersion());
componentsToSchedule.put(revision.getComponentId(), dto);
});
// set the components and their current revision
requestEntity.setComponents(componentsToSchedule);
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestEntity.isDisconnectedNodeAcknowledged());
}
final Map<String, RevisionDTO> requestComponentsToSchedule = requestEntity.getComponents();
final Map<String, Revision> requestComponentRevisions =
requestComponentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
final Set<Revision> requestRevisions = new HashSet<>(requestComponentRevisions.values());
return withWriteLock(
serviceFacade,
requestEntity,
requestRevisions,
lookup -> {
// ensure access to the flow
authorizeFlow();
// ensure access to every component being scheduled
requestComponentsToSchedule.keySet().forEach(componentId -> {
final Authorizable authorizable = lookup.getControllerService(componentId).getAuthorizable();
OperationAuthorizable.authorizeOperation(authorizable, authorizer, NiFiUserUtils.getNiFiUser());
});
},
() -> serviceFacade.verifyActivateControllerServices(id, desiredState, requestComponentRevisions.keySet()),
(revisions, scheduleComponentsEntity) -> {
final ControllerServiceState serviceState = ControllerServiceState.valueOf(scheduleComponentsEntity.getState());
final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents();
final Map<String, Revision> componentRevisions =
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
// update the controller services
final ActivateControllerServicesEntity entity = serviceFacade.activateControllerServices(id, serviceState, componentRevisions);
return generateOkResponse(entity).build();
}
);
}
// ------
// search
// ------
/**
* Performs a search request in this flow.
*
* @param value Search string
* @return A searchResultsEntity
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("search-results")
@ApiOperation(
value = "Performs a search against this NiFi using the specified search term",
notes = "Only search results from authorized components will be returned.",
response = SearchResultsEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response searchFlow(
@QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value,
@QueryParam("a") @DefaultValue(StringUtils.EMPTY) String activeGroupId
) throws InterruptedException {
authorizeFlow();
// query the controller
final SearchResultsDTO results = serviceFacade.searchController(value, activeGroupId);
// create the entity
final SearchResultsEntity entity = new SearchResultsEntity();
entity.setSearchResultsDTO(results);
// generate the response
return noCache(Response.ok(entity)).build();
}
/**
* Retrieves the status for this NiFi.
*
* @return A controllerStatusEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("status")
@ApiOperation(
value = "Gets the current status of this NiFi",
response = ControllerStatusEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getControllerStatus() throws InterruptedException {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
final ControllerStatusDTO controllerStatus = serviceFacade.getControllerStatus();
// create the response entity
final ControllerStatusEntity entity = new ControllerStatusEntity();
entity.setControllerStatus(controllerStatus);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Retrieves the cluster summary for this NiFi.
*
* @return A clusterSummaryEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("cluster/summary")
@ApiOperation(
value = "The cluster summary for this NiFi",
response = ClusteSummaryEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getClusterSummary() throws InterruptedException {
authorizeFlow();
final ClusterSummaryDTO clusterConfiguration = new ClusterSummaryDTO();
final ClusterCoordinator clusterCoordinator = getClusterCoordinator();
if (clusterCoordinator != null && clusterCoordinator.isConnected()) {
final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = clusterCoordinator.getConnectionStates();
int totalNodeCount = 0;
for (final List<NodeIdentifier> nodeList : stateMap.values()) {
totalNodeCount += nodeList.size();
}
final List<NodeIdentifier> connectedNodeIds = stateMap.get(NodeConnectionState.CONNECTED);
final int connectedNodeCount = (connectedNodeIds == null) ? 0 : connectedNodeIds.size();
clusterConfiguration.setConnectedNodeCount(connectedNodeCount);
clusterConfiguration.setTotalNodeCount(totalNodeCount);
clusterConfiguration.setConnectedNodes(connectedNodeCount + " / " + totalNodeCount);
}
clusterConfiguration.setClustered(isClustered());
clusterConfiguration.setConnectedToCluster(isConnectedToCluster());
// create the response entity
final ClusteSummaryEntity entity = new ClusteSummaryEntity();
entity.setClusterSummary(clusterConfiguration);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Retrieves the controller level bulletins.
*
* @return A controllerBulletinsEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("controller/bulletins")
@ApiOperation(
value = "Retrieves Controller level bulletins",
response = ControllerBulletinsEntity.class,
authorizations = {
@Authorization(value = "Read - /flow"),
@Authorization(value = "Read - /controller - For controller bulletins"),
@Authorization(value = "Read - /controller-services/{uuid} - For controller service bulletins"),
@Authorization(value = "Read - /reporting-tasks/{uuid} - For reporting task bulletins")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getBulletins() {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
final ControllerBulletinsEntity entity = serviceFacade.getControllerBulletins();
return generateOkResponse(entity).build();
}
/**
* Retrieves the banners for this NiFi.
*
* @return A bannerEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("banners")
@ApiOperation(
value = "Retrieves the banners for this NiFi",
response = BannerEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getBanners() {
authorizeFlow();
// get the banner from the properties - will come from the NCM when clustered
final String bannerText = getProperties().getBannerText();
// create the DTO
final BannerDTO bannerDTO = new BannerDTO();
bannerDTO.setHeaderText(bannerText);
bannerDTO.setFooterText(bannerText);
// create the response entity
final BannerEntity entity = new BannerEntity();
entity.setBanners(bannerDTO);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Retrieves the types of processors that this NiFi supports.
*
* @return A processorTypesEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("processor-types")
@ApiOperation(
value = "Retrieves the types of processors that this NiFi supports",
notes = NON_GUARANTEED_ENDPOINT,
response = ProcessorTypesEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getProcessorTypes(
@ApiParam(
value = "If specified, will only return types that are a member of this bundle group.",
required = false
)
@QueryParam("bundleGroupFilter") String bundleGroupFilter,
@ApiParam(
value = "If specified, will only return types that are a member of this bundle artifact.",
required = false
)
@QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
@ApiParam(
value = "If specified, will only return types whose fully qualified classname matches.",
required = false
)
@QueryParam("type") String typeFilter) throws InterruptedException {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// create response entity
final ProcessorTypesEntity entity = new ProcessorTypesEntity();
entity.setProcessorTypes(serviceFacade.getProcessorTypes(bundleGroupFilter, bundleArtifactFilter, typeFilter));
// generate the response
return generateOkResponse(entity).build();
}
/**
* Retrieves the types of controller services that this NiFi supports.
*
* @param serviceType Returns only services that implement this type
* @return A controllerServicesTypesEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("controller-service-types")
@ApiOperation(
value = "Retrieves the types of controller services that this NiFi supports",
notes = NON_GUARANTEED_ENDPOINT,
response = ControllerServiceTypesEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getControllerServiceTypes(
@ApiParam(
value = "If specified, will only return controller services that are compatible with this type of service.",
required = false
)
@QueryParam("serviceType") String serviceType,
@ApiParam(
value = "If serviceType specified, is the bundle group of the serviceType.",
required = false
)
@QueryParam("serviceBundleGroup") String serviceBundleGroup,
@ApiParam(
value = "If serviceType specified, is the bundle artifact of the serviceType.",
required = false
)
@QueryParam("serviceBundleArtifact") String serviceBundleArtifact,
@ApiParam(
value = "If serviceType specified, is the bundle version of the serviceType.",
required = false
)
@QueryParam("serviceBundleVersion") String serviceBundleVersion,
@ApiParam(
value = "If specified, will only return types that are a member of this bundle group.",
required = false
)
@QueryParam("bundleGroupFilter") String bundleGroupFilter,
@ApiParam(
value = "If specified, will only return types that are a member of this bundle artifact.",
required = false
)
@QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
@ApiParam(
value = "If specified, will only return types whose fully qualified classname matches.",
required = false
)
@QueryParam("typeFilter") String typeFilter) throws InterruptedException {
authorizeFlow();
if (serviceType != null) {
if (serviceBundleGroup == null || serviceBundleArtifact == null || serviceBundleVersion == null) {
throw new IllegalArgumentException("When specifying the serviceType the serviceBundleGroup, serviceBundleArtifact, and serviceBundleVersion must be specified.");
}
}
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// create response entity
final ControllerServiceTypesEntity entity = new ControllerServiceTypesEntity();
entity.setControllerServiceTypes(serviceFacade.getControllerServiceTypes(serviceType, serviceBundleGroup, serviceBundleArtifact, serviceBundleVersion,
bundleGroupFilter, bundleArtifactFilter, typeFilter));
// generate the response
return generateOkResponse(entity).build();
}
/**
* Retrieves the types of reporting tasks that this NiFi supports.
*
* @return A controllerServicesTypesEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("reporting-task-types")
@ApiOperation(
value = "Retrieves the types of reporting tasks that this NiFi supports",
notes = NON_GUARANTEED_ENDPOINT,
response = ReportingTaskTypesEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getReportingTaskTypes(
@ApiParam(
value = "If specified, will only return types that are a member of this bundle group.",
required = false
)
@QueryParam("bundleGroupFilter") String bundleGroupFilter,
@ApiParam(
value = "If specified, will only return types that are a member of this bundle artifact.",
required = false
)
@QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
@ApiParam(
value = "If specified, will only return types whose fully qualified classname matches.",
required = false
)
@QueryParam("type") String typeFilter) throws InterruptedException {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// create response entity
final ReportingTaskTypesEntity entity = new ReportingTaskTypesEntity();
entity.setReportingTaskTypes(serviceFacade.getReportingTaskTypes(bundleGroupFilter, bundleArtifactFilter, typeFilter));
// generate the response
return generateOkResponse(entity).build();
}
/**
* Retrieves the types of prioritizers that this NiFi supports.
*
* @return A prioritizerTypesEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("prioritizers")
@ApiOperation(
value = "Retrieves the types of prioritizers that this NiFi supports",
notes = NON_GUARANTEED_ENDPOINT,
response = PrioritizerTypesEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getPrioritizers() throws InterruptedException {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// create response entity
final PrioritizerTypesEntity entity = new PrioritizerTypesEntity();
entity.setPrioritizerTypes(serviceFacade.getWorkQueuePrioritizerTypes());
// generate the response
return generateOkResponse(entity).build();
}
/**
* Retrieves details about this NiFi to put in the About dialog.
*
* @return An aboutEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("about")
@ApiOperation(
value = "Retrieves details about this NiFi to put in the About dialog",
response = AboutEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getAboutInfo() {
authorizeFlow();
// create the about dto
final AboutDTO aboutDTO = new AboutDTO();
aboutDTO.setTitle("NiFi");
aboutDTO.setUri(generateResourceUri());
aboutDTO.setTimezone(new Date());
// get the content viewer url
final NiFiProperties properties = getProperties();
aboutDTO.setContentViewerUrl(properties.getProperty(NiFiProperties.CONTENT_VIEWER_URL));
final Bundle frameworkBundle = NarClassLoadersHolder.getInstance().getFrameworkBundle();
if (frameworkBundle != null) {
final BundleDetails frameworkDetails = frameworkBundle.getBundleDetails();
// set the version
aboutDTO.setVersion(frameworkDetails.getCoordinate().getVersion());
// Get build info
aboutDTO.setBuildTag(frameworkDetails.getBuildTag());
aboutDTO.setBuildRevision(frameworkDetails.getBuildRevision());
aboutDTO.setBuildBranch(frameworkDetails.getBuildBranch());
aboutDTO.setBuildTimestamp(frameworkDetails.getBuildTimestampDate());
}
// create the response entity
final AboutEntity entity = new AboutEntity();
entity.setAbout(aboutDTO);
// generate the response
return generateOkResponse(entity).build();
}
// ----------
// registries
// ----------
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("registries")
@ApiOperation(value = "Gets the listing of available registries", response = RegistryClientsEntity.class, authorizations = {
@Authorization(value = "Read - /flow")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response getRegistries() {
authorizeFlow();
final Set<RegistryEntity> registries = serviceFacade.getRegistriesForUser(NiFiUserUtils.getNiFiUser());
final RegistriesEntity registryEntities = new RegistriesEntity();
registryEntities.setRegistries(registries);
return generateOkResponse(registryEntities).build();
}
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("registries/{id}/buckets")
@ApiOperation(value = "Gets the buckets from the specified registry for the current user", response = BucketsEntity.class, authorizations = {
@Authorization(value = "Read - /flow")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response getBuckets(
@ApiParam(
value = "The registry id.",
required = true
)
@PathParam("id") String id) throws NiFiRegistryException {
authorizeFlow();
final Set<BucketEntity> buckets = serviceFacade.getBucketsForUser(id, NiFiUserUtils.getNiFiUser());
final SortedSet<BucketEntity> sortedBuckets = sortBuckets(buckets);
final BucketsEntity bucketsEntity = new BucketsEntity();
bucketsEntity.setBuckets(sortedBuckets);
return generateOkResponse(bucketsEntity).build();
}
private SortedSet<BucketEntity> sortBuckets(final Set<BucketEntity> buckets) {
final SortedSet<BucketEntity> sortedBuckets = new TreeSet<>(new Comparator<BucketEntity>() {
@Override
public int compare(final BucketEntity entity1, final BucketEntity entity2) {
return Collator.getInstance().compare(getBucketName(entity1), getBucketName(entity2));
}
});
sortedBuckets.addAll(buckets);
return sortedBuckets;
}
private String getBucketName(final BucketEntity entity) {
return entity.getBucket() == null ? null : entity.getBucket().getName();
}
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("registries/{registry-id}/buckets/{bucket-id}/flows")
@ApiOperation(value = "Gets the flows from the specified registry and bucket for the current user", response = VersionedFlowsEntity.class, authorizations = {
@Authorization(value = "Read - /flow")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response getFlows(
@ApiParam(
value = "The registry id.",
required = true
)
@PathParam("registry-id") String registryId,
@ApiParam(
value = "The bucket id.",
required = true
)
@PathParam("bucket-id") String bucketId) {
authorizeFlow();
final Set<VersionedFlowEntity> versionedFlows = serviceFacade.getFlowsForUser(registryId, bucketId, NiFiUserUtils.getNiFiUser());
final SortedSet<VersionedFlowEntity> sortedFlows = sortFlows(versionedFlows);
final VersionedFlowsEntity versionedFlowsEntity = new VersionedFlowsEntity();
versionedFlowsEntity.setVersionedFlows(sortedFlows);
return generateOkResponse(versionedFlowsEntity).build();
}
private SortedSet<VersionedFlowEntity> sortFlows(final Set<VersionedFlowEntity> versionedFlows) {
final SortedSet<VersionedFlowEntity> sortedFlows = new TreeSet<>(new Comparator<VersionedFlowEntity>() {
@Override
public int compare(final VersionedFlowEntity entity1, final VersionedFlowEntity entity2) {
return Collator.getInstance().compare(getFlowName(entity1), getFlowName(entity2));
}
});
sortedFlows.addAll(versionedFlows);
return sortedFlows;
}
private String getFlowName(final VersionedFlowEntity flowEntity) {
return flowEntity.getVersionedFlow() == null ? "" : flowEntity.getVersionedFlow().getFlowName();
}
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("registries/{registry-id}/buckets/{bucket-id}/flows/{flow-id}/versions")
@ApiOperation(value = "Gets the flow versions from the specified registry and bucket for the specified flow for the current user",
response = VersionedFlowSnapshotMetadataSetEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response getVersions(
@ApiParam(
value = "The registry id.",
required = true
)
@PathParam("registry-id") String registryId,
@ApiParam(
value = "The bucket id.",
required = true
)
@PathParam("bucket-id") String bucketId,
@ApiParam(
value = "The flow id.",
required = true
)
@PathParam("flow-id") String flowId) {
authorizeFlow();
final Set<VersionedFlowSnapshotMetadataEntity> versionedFlowSnapshotMetadataSet = serviceFacade.getFlowVersionsForUser(registryId, bucketId, flowId, NiFiUserUtils.getNiFiUser());
final VersionedFlowSnapshotMetadataSetEntity versionedFlowSnapshotMetadataSetEntity = new VersionedFlowSnapshotMetadataSetEntity();
versionedFlowSnapshotMetadataSetEntity.setVersionedFlowSnapshotMetadataSet(versionedFlowSnapshotMetadataSet);
return generateOkResponse(versionedFlowSnapshotMetadataSetEntity).build();
}
// --------------
// bulletin board
// --------------
/**
* Retrieves all the of bulletins in this NiFi.
*
* @param after Supporting querying for bulletins after a particular
* bulletin id.
* @param limit The max number of bulletins to return.
* @param sourceName Source name filter. Supports a regular expression.
* @param message Message filter. Supports a regular expression.
* @param sourceId Source id filter. Supports a regular expression.
* @param groupId Group id filter. Supports a regular expression.
* @return A bulletinBoardEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("bulletin-board")
@ApiOperation(
value = "Gets current bulletins",
response = BulletinBoardEntity.class,
authorizations = {
@Authorization(value = "Read - /flow"),
@Authorization(value = "Read - /{component-type}/{uuid} - For component specific bulletins")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getBulletinBoard(
@ApiParam(
value = "Includes bulletins with an id after this value.",
required = false
)
@QueryParam("after") LongParameter after,
@ApiParam(
value = "Includes bulletins originating from this sources whose name match this regular expression.",
required = false
)
@QueryParam("sourceName") BulletinBoardPatternParameter sourceName,
@ApiParam(
value = "Includes bulletins whose message that match this regular expression.",
required = false
)
@QueryParam("message") BulletinBoardPatternParameter message,
@ApiParam(
value = "Includes bulletins originating from this sources whose id match this regular expression.",
required = false
)
@QueryParam("sourceId") BulletinBoardPatternParameter sourceId,
@ApiParam(
value = "Includes bulletins originating from this sources whose group id match this regular expression.",
required = false
)
@QueryParam("groupId") BulletinBoardPatternParameter groupId,
@ApiParam(
value = "The number of bulletins to limit the response to.",
required = false
)
@QueryParam("limit") IntegerParameter limit) throws InterruptedException {
authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// build the bulletin query
final BulletinQueryDTO query = new BulletinQueryDTO();
if (sourceId != null) {
query.setSourceId(sourceId.getRawPattern());
}
if (groupId != null) {
query.setGroupId(groupId.getRawPattern());
}
if (sourceName != null) {
query.setName(sourceName.getRawPattern());
}
if (message != null) {
query.setMessage(message.getRawPattern());
}
if (after != null) {
query.setAfter(after.getLong());
}
if (limit != null) {
query.setLimit(limit.getInteger());
}
// get the bulletin board
final BulletinBoardDTO bulletinBoard = serviceFacade.getBulletinBoard(query);
// create the response entity
BulletinBoardEntity entity = new BulletinBoardEntity();
entity.setBulletinBoard(bulletinBoard);
// generate the response
return generateOkResponse(entity).build();
}
// ------
// status
// ------
/**
* Retrieves the specified processor status.
*
* @param id The id of the processor history to retrieve.
* @return A processorStatusEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("processors/{id}/status")
@ApiOperation(
value = "Gets status for a processor",
response = ProcessorStatusEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getProcessorStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
value = "The processor id.",
required = true
)
@PathParam("id") String id) throws InterruptedException {
authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
}
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final ProcessorStatusEntity entity = (ProcessorStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
if (entity != null && !nodewise) {
entity.getProcessorStatus().setNodeSnapshots(null);
}
return nodeResponse.getResponse();
} else {
return replicate(HttpMethod.GET, clusterNodeId);
}
}
// get the specified processor status
final ProcessorStatusEntity entity = serviceFacade.getProcessorStatus(id);
return generateOkResponse(entity).build();
}
/**
* Retrieves the specified input port status.
*
* @param id The id of the processor history to retrieve.
* @return A portStatusEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("input-ports/{id}/status")
@ApiOperation(
value = "Gets status for an input port",
response = PortStatusEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getInputPortStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
value = "The input port id.",
required = true
)
@PathParam("id") String id) throws InterruptedException {
authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
}
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
if (entity != null && !nodewise) {
entity.getPortStatus().setNodeSnapshots(null);
}
return nodeResponse.getResponse();
} else {
return replicate(HttpMethod.GET, clusterNodeId);
}
}
// get the specified input port status
final PortStatusEntity entity = serviceFacade.getInputPortStatus(id);
return generateOkResponse(entity).build();
}
/**
* Retrieves the specified output port status.
*
* @param id The id of the processor history to retrieve.
* @return A portStatusEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("output-ports/{id}/status")
@ApiOperation(
value = "Gets status for an output port",
response = PortStatusEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getOutputPortStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
value = "The output port id.",
required = true
)
@PathParam("id") String id) throws InterruptedException {
authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
}
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
if (entity != null && !nodewise) {
entity.getPortStatus().setNodeSnapshots(null);
}
return nodeResponse.getResponse();
} else {
return replicate(HttpMethod.GET, clusterNodeId);
}
}
// get the specified output port status
final PortStatusEntity entity = serviceFacade.getOutputPortStatus(id);
return generateOkResponse(entity).build();
}
/**
* Retrieves the specified remote process group status.
*
* @param id The id of the processor history to retrieve.
* @return A remoteProcessGroupStatusEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("remote-process-groups/{id}/status")
@ApiOperation(
value = "Gets status for a remote process group",
response = RemoteProcessGroupStatusEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getRemoteProcessGroupStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
value = "The remote process group id.",
required = true
)
@PathParam("id") String id) throws InterruptedException {
authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
}
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final RemoteProcessGroupStatusEntity entity = (RemoteProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
if (entity != null && !nodewise) {
entity.getRemoteProcessGroupStatus().setNodeSnapshots(null);
}
return nodeResponse.getResponse();
} else {
return replicate(HttpMethod.GET, clusterNodeId);
}
}
// get the specified remote process group status
final RemoteProcessGroupStatusEntity entity = serviceFacade.getRemoteProcessGroupStatus(id);
return generateOkResponse(entity).build();
}
/**
* Retrieves the status report for this NiFi.
*
* @param recursive Optional recursive flag that defaults to false. If set to true, all descendant groups and the status of their content will be included.
* @param groupId The group id
* @return A processGroupStatusEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("process-groups/{id}/status")
@ApiOperation(
value = "Gets the status for a process group",
notes = "The status for a process group includes status for all descendent components. When invoked on the root group with "
+ "recursive set to true, it will return the current status of every component in the flow.",
response = ProcessGroupStatusEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getProcessGroupStatus(
@ApiParam(
value = "Whether all descendant groups and the status of their content will be included. Optional, defaults to false",
required = false
)
@QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive,
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
value = "The process group id.",
required = true
)
@PathParam("id") String groupId) throws InterruptedException {
authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
}
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final ProcessGroupStatusEntity entity = (ProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
if (entity != null && !nodewise) {
entity.getProcessGroupStatus().setNodeSnapshots(null);
}
return nodeResponse.getResponse();
} else {
return replicate(HttpMethod.GET, clusterNodeId);
}
}
// get the status
final ProcessGroupStatusEntity entity = serviceFacade.getProcessGroupStatus(groupId, recursive);
return generateOkResponse(entity).build();
}
/**
* Retrieves the specified connection status.
*
* @param id The id of the connection history to retrieve.
* @return A connectionStatusEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("connections/{id}/status")
@ApiOperation(
value = "Gets status for a connection",
response = ConnectionStatusEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getConnectionStatus(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the status.",
required = false
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
value = "The connection id.",
required = true
)
@PathParam("id") String id) throws InterruptedException {
authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
}
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final ConnectionStatusEntity entity = (ConnectionStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
if (entity != null && !nodewise) {
entity.getConnectionStatus().setNodeSnapshots(null);
}
return nodeResponse.getResponse();
} else {
return replicate(HttpMethod.GET, clusterNodeId);
}
}
// get the specified connection status
final ConnectionStatusEntity entity = serviceFacade.getConnectionStatus(id);
return generateOkResponse(entity).build();
}
/**
* Retrieves the specified connection statistics.
*
* @param id The id of the connection statistics to retrieve.
* @return A ConnectionStatisticsEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("connections/{id}/statistics")
@ApiOperation(
value = "Gets statistics for a connection",
response = ConnectionStatisticsEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getConnectionStatistics(
@ApiParam(
value = "Whether or not to include the breakdown per node. Optional, defaults to false",
required = false
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
value = "The id of the node where to get the statistics.",
required = false
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
value = "The connection id.",
required = true
)
@PathParam("id") String id) throws InterruptedException {
authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
}
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final ConnectionStatisticsEntity entity = (ConnectionStatisticsEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
if (entity != null && !nodewise) {
entity.getConnectionStatistics().setNodeSnapshots(null);
}
return nodeResponse.getResponse();
} else {
return replicate(HttpMethod.GET, clusterNodeId);
}
}
// get the specified connection status
final ConnectionStatisticsEntity entity = serviceFacade.getConnectionStatistics(id);
return generateOkResponse(entity).build();
}
// --------------
// status history
// --------------
/**
* Retrieves the specified processor status history.
*
* @param id The id of the processor history to retrieve.
* @return A statusHistoryEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("processors/{id}/status/history")
@ApiOperation(
value = "Gets status history for a processor",
response = StatusHistoryEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getProcessorStatusHistory(
@ApiParam(
value = "The processor id.",
required = true
)
@PathParam("id") String id) throws InterruptedException {
authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get the specified processor status history
final StatusHistoryEntity entity = serviceFacade.getProcessorStatusHistory(id);
return generateOkResponse(entity).build();
}
/**
* Retrieves the specified remote process groups status history.
*
* @param groupId The group id
* @return A processorEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("process-groups/{id}/status/history")
@ApiOperation(
value = "Gets status history for a remote process group",
response = StatusHistoryEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getProcessGroupStatusHistory(
@ApiParam(
value = "The process group id.",
required = true
)
@PathParam("id") String groupId) throws InterruptedException {
authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get the specified processor status history
final StatusHistoryEntity entity = serviceFacade.getProcessGroupStatusHistory(groupId);
return generateOkResponse(entity).build();
}
/**
* Retrieves the specified remote process groups status history.
*
* @param id The id of the remote process group to retrieve the status fow.
* @return A statusHistoryEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("remote-process-groups/{id}/status/history")
@ApiOperation(
value = "Gets the status history",
response = StatusHistoryEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getRemoteProcessGroupStatusHistory(
@ApiParam(
value = "The remote process group id.",
required = true
)
@PathParam("id") String id) throws InterruptedException {
authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get the specified processor status history
final StatusHistoryEntity entity = serviceFacade.getRemoteProcessGroupStatusHistory(id);
return generateOkResponse(entity).build();
}
/**
* Retrieves the specified connection status history.
*
* @param id The id of the connection to retrieve.
* @return A statusHistoryEntity.
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("connections/{id}/status/history")
@ApiOperation(
value = "Gets the status history for a connection",
response = StatusHistoryEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getConnectionStatusHistory(
@ApiParam(
value = "The connection id.",
required = true
)
@PathParam("id") String id) throws InterruptedException {
authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get the specified processor status history
final StatusHistoryEntity entity = serviceFacade.getConnectionStatusHistory(id);
return generateOkResponse(entity).build();
}
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("parameter-contexts")
@ApiOperation(
value = "Gets all Parameter Contexts",
response = ParameterContextsEntity.class,
authorizations = {
@Authorization(value = "Read - /parameter-contexts/{id} for each Parameter Context")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getParameterContexts() {
authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
final Set<ParameterContextEntity> parameterContexts = serviceFacade.getParameterContexts();
final ParameterContextsEntity entity = new ParameterContextsEntity();
entity.setParameterContexts(parameterContexts);
entity.setCurrentTime(new Date());
// generate the response
return generateOkResponse(entity).build();
}
// -------
// history
// -------
/**
* Queries the history of this Controller.
*
* @param offset The offset into the data. This parameter is required and is
* used in conjunction with count.
* @param count The number of rows that should be returned. This parameter
* is required and is used in conjunction with page.
* @param sortColumn The column to sort on. This parameter is optional. If
* not specified the results will be returned with the most recent first.
* @param sortOrder The sort order.
* @param startDate The start date/time for the query. The start date/time
* must be formatted as 'MM/dd/yyyy HH:mm:ss'. This parameter is optional
* and must be specified in the timezone of the server. The server's
* timezone can be determined by inspecting the result of a status or
* history request.
* @param endDate The end date/time for the query. The end date/time must be
* formatted as 'MM/dd/yyyy HH:mm:ss'. This parameter is optional and must
* be specified in the timezone of the server. The server's timezone can be
* determined by inspecting the result of a status or history request.
* @param userIdentity The user name of the user who's actions are being
* queried. This parameter is optional.
* @param sourceId The id of the source being queried (usually a processor
* id). This parameter is optional.
* @return A historyEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("history")
@ApiOperation(
value = "Gets configuration history",
notes = NON_GUARANTEED_ENDPOINT,
response = HistoryEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response queryHistory(
@ApiParam(
value = "The offset into the result set.",
required = true
)
@QueryParam("offset") IntegerParameter offset,
@ApiParam(
value = "The number of actions to return.",
required = true
)
@QueryParam("count") IntegerParameter count,
@ApiParam(
value = "The field to sort on.",
required = false
)
@QueryParam("sortColumn") String sortColumn,
@ApiParam(
value = "The direction to sort.",
required = false
)
@QueryParam("sortOrder") String sortOrder,
@ApiParam(
value = "Include actions after this date.",
required = false
)
@QueryParam("startDate") DateTimeParameter startDate,
@ApiParam(
value = "Include actions before this date.",
required = false
)
@QueryParam("endDate") DateTimeParameter endDate,
@ApiParam(
value = "Include actions performed by this user.",
required = false
)
@QueryParam("userIdentity") String userIdentity,
@ApiParam(
value = "Include actions on this component.",
required = false
)
@QueryParam("sourceId") String sourceId) {
authorizeFlow();
// ensure the page is specified
if (offset == null) {
throw new IllegalArgumentException("The desired offset must be specified.");
} else if (offset.getInteger() < 0) {
throw new IllegalArgumentException("The desired offset must be an integer value greater than or equal to 0.");
}
// ensure the row count is specified
if (count == null) {
throw new IllegalArgumentException("The desired row count must be specified.");
} else if (count.getInteger() < 1) {
throw new IllegalArgumentException("The desired row count must be an integer value greater than 0.");
}
// normalize the sort order
if (sortOrder != null) {
if (!sortOrder.equalsIgnoreCase("asc") && !sortOrder.equalsIgnoreCase("desc")) {
throw new IllegalArgumentException("The sort order must be 'asc' or 'desc'.");
}
}
// ensure the start and end dates are specified
if (endDate != null && startDate != null) {
if (endDate.getDateTime().before(startDate.getDateTime())) {
throw new IllegalArgumentException("The start date/time must come before the end date/time.");
}
}
// Note: History requests are not replicated throughout the cluster and are instead handled by the nodes independently
// create a history query
final HistoryQueryDTO query = new HistoryQueryDTO();
query.setSortColumn(sortColumn);
query.setSortOrder(sortOrder);
query.setOffset(offset.getInteger());
query.setCount(count.getInteger());
// optionally set the start date
if (startDate != null) {
query.setStartDate(startDate.getDateTime());
}
// optionally set the end date
if (endDate != null) {
query.setEndDate(endDate.getDateTime());
}
// optionally set the user id
if (userIdentity != null) {
query.setUserIdentity(userIdentity);
}
// optionally set the processor id
if (sourceId != null) {
query.setSourceId(sourceId);
}
// perform the query
final HistoryDTO history = serviceFacade.getActions(query);
// create the response entity
final HistoryEntity entity = new HistoryEntity();
entity.setHistory(history);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Gets the action for the corresponding id.
*
* @param id The id of the action to get.
* @return An actionEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("history/{id}")
@ApiOperation(
value = "Gets an action",
notes = NON_GUARANTEED_ENDPOINT,
response = ActionEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getAction(
@ApiParam(
value = "The action id.",
required = true
)
@PathParam("id") IntegerParameter id) {
authorizeFlow();
// ensure the id was specified
if (id == null) {
throw new IllegalArgumentException("The action id must be specified.");
}
// Note: History requests are not replicated throughout the cluster and are instead handled by the nodes independently
// get the response entity for the specified action
final ActionEntity entity = serviceFacade.getAction(id.getInteger());
// generate the response
return generateOkResponse(entity).build();
}
/**
* Gets the actions for the specified component.
*
* @param componentId The id of the component.
* @return An processorHistoryEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("history/components/{componentId}")
@ApiOperation(
value = "Gets configuration history for a component",
notes = NON_GUARANTEED_ENDPOINT,
response = ComponentHistoryEntity.class,
authorizations = {
@Authorization(value = "Read - /flow"),
@Authorization(value = "Read underlying component - /{component-type}/{uuid}")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getComponentHistory(
@ApiParam(
value = "The component id.",
required = true
)
@PathParam("componentId") final String componentId) {
serviceFacade.authorizeAccess(lookup -> {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// authorize the flow
authorizeFlow();
try {
final Authorizable authorizable = lookup.getProcessor(componentId).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.READ, user);
return;
} catch (final ResourceNotFoundException e) {
// ignore as the component may not be a processor
}
try {
final Authorizable authorizable = lookup.getControllerService(componentId).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.READ, user);
return;
} catch (final ResourceNotFoundException e) {
// ignore as the component may not be a controller service
}
try {
final Authorizable authorizable = lookup.getReportingTask(componentId).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.READ, user);
return;
} catch (final ResourceNotFoundException e) {
// ignore as the component may not be a reporting task
}
// a component for the specified id could not be found, attempt to authorize based on read to the controller
final Authorizable controller = lookup.getController();
controller.authorize(authorizer, RequestAction.READ, user);
});
// Note: History requests are not replicated throughout the cluster and are instead handled by the nodes independently
// create the response entity
final ComponentHistoryEntity entity = new ComponentHistoryEntity();
entity.setComponentHistory(serviceFacade.getComponentHistory(componentId));
// generate the response
return generateOkResponse(entity).build();
}
// ---------
// templates
// ---------
/**
* Retrieves all the of templates in this NiFi.
*
* @return A templatesEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("templates")
@ApiOperation(
value = "Gets all templates",
response = TemplatesEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getTemplates() {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// authorize access
authorizeFlow();
// get all the templates
final Set<TemplateEntity> templates = serviceFacade.getTemplates();
templateResource.populateRemainingTemplateEntitiesContent(templates);
// create the response entity
final TemplatesEntity entity = new TemplatesEntity();
entity.setTemplates(templates);
entity.setGenerated(new Date());
// generate the response
return generateOkResponse(entity).build();
}
// --------------------
// search cluster nodes
// --------------------
/**
* Searches the cluster for a node with a given address.
*
* @param value Search value that will be matched against a node's address
* @return Nodes that match the specified criteria
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("cluster/search-results")
@ApiOperation(
value = "Searches the cluster for a node with the specified address",
notes = NON_GUARANTEED_ENDPOINT,
response = ClusterSearchResultsEntity.class,
authorizations = {
@Authorization(value = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response searchCluster(
@ApiParam(
value = "Node address to search for.",
required = true
)
@QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value) {
authorizeFlow();
// ensure connected to the cluster
if (!isConnectedToCluster()) {
throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request.");
}
final List<NodeSearchResultDTO> nodeMatches = new ArrayList<>();
// get the nodes in the cluster
final ClusterDTO cluster = serviceFacade.getCluster();
// check each to see if it matches the search term
for (NodeDTO node : cluster.getNodes()) {
// ensure the node is connected
if (!NodeConnectionState.CONNECTED.name().equals(node.getStatus())) {
continue;
}
// determine the current nodes address
final String address = node.getAddress() + ":" + node.getApiPort();
// count the node if there is no search or it matches the address
if (StringUtils.isBlank(value) || StringUtils.containsIgnoreCase(address, value)) {
final NodeSearchResultDTO nodeMatch = new NodeSearchResultDTO();
nodeMatch.setId(node.getNodeId());
nodeMatch.setAddress(address);
nodeMatches.add(nodeMatch);
}
}
// build the response
ClusterSearchResultsEntity results = new ClusterSearchResultsEntity();
results.setNodeResults(nodeMatches);
// generate an 200 - OK response
return noCache(Response.ok(results)).build();
}
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
public void setProcessorResource(ProcessorResource processorResource) {
this.processorResource = processorResource;
}
public void setInputPortResource(InputPortResource inputPortResource) {
this.inputPortResource = inputPortResource;
}
public void setOutputPortResource(OutputPortResource outputPortResource) {
this.outputPortResource = outputPortResource;
}
public void setFunnelResource(FunnelResource funnelResource) {
this.funnelResource = funnelResource;
}
public void setLabelResource(LabelResource labelResource) {
this.labelResource = labelResource;
}
public void setRemoteProcessGroupResource(RemoteProcessGroupResource remoteProcessGroupResource) {
this.remoteProcessGroupResource = remoteProcessGroupResource;
}
public void setConnectionResource(ConnectionResource connectionResource) {
this.connectionResource = connectionResource;
}
public void setTemplateResource(TemplateResource templateResource) {
this.templateResource = templateResource;
}
public void setProcessGroupResource(ProcessGroupResource processGroupResource) {
this.processGroupResource = processGroupResource;
}
public void setControllerServiceResource(ControllerServiceResource controllerServiceResource) {
this.controllerServiceResource = controllerServiceResource;
}
public void setReportingTaskResource(ReportingTaskResource reportingTaskResource) {
this.reportingTaskResource = reportingTaskResource;
}
public void setAuthorizer(Authorizer authorizer) {
this.authorizer = authorizer;
}
}