blob: f803471712c15104165d686d669d2c373ac5bf9b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.ConfigurationSnapshot;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO;
import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
import org.apache.nifi.web.api.entity.ClusterConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ClusterPortStatusEntity;
import org.apache.nifi.web.api.entity.ClusterProcessorStatusEntity;
import org.apache.nifi.web.api.entity.ClusterRemoteProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
import org.apache.nifi.web.api.entity.ClusterStatusEntity;
import org.apache.nifi.web.api.entity.ClusterStatusHistoryEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.apache.commons.lang3.StringUtils;
import org.springframework.security.access.prepost.PreAuthorize;
import com.sun.jersey.api.core.ResourceContext;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity;
import org.codehaus.enunciate.jaxrs.TypeHint;
/**
* RESTful endpoint for managing a cluster.
*/
@Path("/cluster")
public class ClusterResource extends ApplicationResource {
@Context
private ResourceContext resourceContext;
private NiFiServiceFacade serviceFacade;
private NiFiProperties properties;
/**
* Locates the ClusterConnection sub-resource.
*
* @return
*/
@Path("/nodes")
public NodeResource getNodeResource() {
return resourceContext.getResource(NodeResource.class);
}
/**
* Gets the status of this NiFi cluster.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @return A clusterStatusEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/status")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterStatusEntity.class)
public Response getClusterStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
if (properties.isClusterManager()) {
ClusterStatusDTO dto = serviceFacade.getClusterStatus();
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterStatusEntity entity = new ClusterStatusEntity();
entity.setClusterStatus(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Returns a 200 OK response to indicate this is a valid cluster endpoint.
*
* @return An OK response with an empty entity body.
*/
@HEAD
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response getClusterHead() {
if (properties.isClusterManager()) {
return Response.ok().build();
} else {
return Response.status(Response.Status.NOT_FOUND).entity("Only a cluster manager can process the request.").build();
}
}
/**
* Gets the contents of this NiFi cluster. This includes all nodes and their
* status.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @return A clusterEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterEntity.class)
public Response getCluster(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
if (properties.isClusterManager()) {
final ClusterDTO dto = serviceFacade.getCluster();
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterEntity entity = new ClusterEntity();
entity.setCluster(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Searches the cluster for a node with a given address.
*
* @param value Search value that will be matched against a node's address
* @return Nodes that match the specified criteria
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/search-results")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterSearchResultsEntity.class)
public Response searchCluster(@QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value) {
// ensure this is the cluster manager
if (properties.isClusterManager()) {
final List<NodeSearchResultDTO> nodeMatches = new ArrayList<>();
// get the nodes in the cluster
final ClusterDTO cluster = serviceFacade.getCluster();
// check each to see if it matches the search term
for (NodeDTO node : cluster.getNodes()) {
// ensure the node is connected
if (!Node.Status.CONNECTED.toString().equals(node.getStatus())) {
continue;
}
// determine the current nodes address
final String address = node.getAddress() + ":" + node.getApiPort();
// count the node if there is no search or it matches the address
if (StringUtils.isBlank(value) || StringUtils.containsIgnoreCase(address, value)) {
final NodeSearchResultDTO nodeMatch = new NodeSearchResultDTO();
nodeMatch.setId(node.getNodeId());
nodeMatch.setAddress(address);
nodeMatches.add(nodeMatch);
}
}
// build the response
ClusterSearchResultsEntity results = new ClusterSearchResultsEntity();
results.setNodeResults(nodeMatches);
// generate an 200 - OK response
return noCache(Response.ok(results)).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the processor.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the processor
* @return A processorEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/processors/{id}")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ProcessorEntity.class)
public Response getProcessor(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (!properties.isClusterManager()) {
final ProcessorDTO dto = serviceFacade.getProcessor(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ProcessorEntity entity = new ProcessorEntity();
entity.setProcessor(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a node can process the request.");
}
/**
* Updates the processors annotation data.
*
* @param httpServletRequest
* @param version The revision is used to verify the client is working with
* the latest version of the flow.
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param processorId The id of the processor.
* @param annotationData The annotation data to set.
* @return A processorEntity.
*/
@PUT
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/processors/{id}")
@PreAuthorize("hasAnyRole('ROLE_DFM')")
@TypeHint(ProcessorEntity.class)
public Response updateProcessor(
@Context HttpServletRequest httpServletRequest,
@FormParam(VERSION) LongParameter version,
@FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@PathParam("id") String processorId,
@FormParam("annotationData") String annotationData) {
if (!properties.isClusterManager()) {
// create the processor configuration with the given annotation data
ProcessorConfigDTO configDto = new ProcessorConfigDTO();
configDto.setAnnotationData(annotationData);
// create the processor dto
ProcessorDTO processorDto = new ProcessorDTO();
processorDto.setId(processorId);
processorDto.setConfig(configDto);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
if (version != null) {
revision.setVersion(version.getLong());
}
// create the processor entity
ProcessorEntity processorEntity = new ProcessorEntity();
processorEntity.setRevision(revision);
processorEntity.setProcessor(processorDto);
return updateProcessor(httpServletRequest, processorId, processorEntity);
}
throw new IllegalClusterResourceRequestException("Only a node can process the request.");
}
/**
* Updates the processors annotation data.
*
* @param httpServletRequest
* @param processorId The id of the processor.
* @param processorEntity A processorEntity.
* @return A processorEntity.
*/
@PUT
@Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/processors/{id}")
@PreAuthorize("hasAnyRole('ROLE_DFM')")
@TypeHint(ProcessorEntity.class)
public Response updateProcessor(
@Context HttpServletRequest httpServletRequest,
@PathParam("id") final String processorId,
final ProcessorEntity processorEntity) {
if (!properties.isClusterManager()) {
if (processorEntity == null || processorEntity.getProcessor() == null) {
throw new IllegalArgumentException("Processor details must be specified.");
}
if (processorEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the same id is being used
ProcessorDTO requestProcessorDTO = processorEntity.getProcessor();
if (!processorId.equals(requestProcessorDTO.getId())) {
throw new IllegalArgumentException(String.format("The processor id (%s) in the request body does "
+ "not equal the processor id of the requested resource (%s).", requestProcessorDTO.getId(), processorId));
}
// get the processor configuration
ProcessorConfigDTO config = requestProcessorDTO.getConfig();
if (config == null) {
throw new IllegalArgumentException("Processor configuration must be specified.");
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
if (expects != null) {
serviceFacade.verifyUpdateProcessor(requestProcessorDTO);
return generateContinueResponse().build();
}
// update the processor
final RevisionDTO revision = processorEntity.getRevision();
final ConfigurationSnapshot<ProcessorDTO> controllerResponse = serviceFacade.setProcessorAnnotationData(
new Revision(revision.getVersion(), revision.getClientId()), processorId, config.getAnnotationData());
// get the processor dto
final ProcessorDTO responseProcessorDTO = controllerResponse.getConfiguration();
// update the revision
RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
updatedRevision.setVersion(controllerResponse.getVersion());
// generate the response entity
final ProcessorEntity entity = new ProcessorEntity();
entity.setRevision(updatedRevision);
entity.setProcessor(responseProcessorDTO);
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a node can process the request.");
}
/**
* Gets the processor status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the processor
* @return A clusterProcessorStatusEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/processors/{id}/status")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterProcessorStatusEntity.class)
public Response getProcessorStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterProcessorStatusDTO dto = serviceFacade.getClusterProcessorStatus(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterProcessorStatusEntity entity = new ClusterProcessorStatusEntity();
entity.setClusterProcessorStatus(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the processor status history for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the processor
* @return A clusterProcessorStatusHistoryEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/processors/{id}/status/history")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterStatusHistoryEntity.class)
public Response getProcessorStatusHistory(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessorStatusHistory(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
entity.setClusterStatusHistory(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the connection status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the processor
* @return A clusterProcessorStatusEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/connections/{id}/status")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterConnectionStatusEntity.class)
public Response getConnectionStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterConnectionStatusDTO dto = serviceFacade.getClusterConnectionStatus(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterConnectionStatusEntity entity = new ClusterConnectionStatusEntity();
entity.setClusterConnectionStatus(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the connections status history for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the processor
* @return A clusterProcessorStatusHistoryEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/connections/{id}/status/history")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterStatusHistoryEntity.class)
public Response getConnectionStatusHistory(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterStatusHistoryDTO dto = serviceFacade.getClusterConnectionStatusHistory(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
entity.setClusterStatusHistory(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the process group status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the process group
* @return A clusterProcessGroupStatusEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/process-groups/{id}/status")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterConnectionStatusEntity.class)
public Response getProcessGroupStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterProcessGroupStatusDTO dto = serviceFacade.getClusterProcessGroupStatus(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterProcessGroupStatusEntity entity = new ClusterProcessGroupStatusEntity();
entity.setClusterProcessGroupStatus(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the process group status history for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the process group
* @return A clusterProcessGroupStatusHistoryEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/process-groups/{id}/status/history")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterStatusHistoryEntity.class)
public Response getProcessGroupStatusHistory(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessGroupStatusHistory(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
entity.setClusterStatusHistory(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the remote process group status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the remote process group
* @return A clusterRemoteProcessGroupStatusEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/remote-process-groups/{id}/status")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterRemoteProcessGroupStatusEntity.class)
public Response getRemoteProcessGroupStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterRemoteProcessGroupStatusDTO dto = serviceFacade.getClusterRemoteProcessGroupStatus(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterRemoteProcessGroupStatusEntity entity = new ClusterRemoteProcessGroupStatusEntity();
entity.setClusterRemoteProcessGroupStatus(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the input port status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the input port
* @return A clusterPortStatusEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/input-ports/{id}/status")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterPortStatusEntity.class)
public Response getInputPortStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterPortStatusDTO dto = serviceFacade.getClusterInputPortStatus(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterPortStatusEntity entity = new ClusterPortStatusEntity();
entity.setClusterPortStatus(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the output port status for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the output port
* @return A clusterPortStatusEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/output-ports/{id}/status")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterPortStatusEntity.class)
public Response getOutputPortStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterPortStatusDTO dto = serviceFacade.getClusterOutputPortStatus(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterPortStatusEntity entity = new ClusterPortStatusEntity();
entity.setClusterPortStatus(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
/**
* Gets the remote process group status history for every node.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the processor
* @return A clusterRemoteProcessGroupStatusHistoryEntity
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/remote-process-groups/{id}/status/history")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ClusterStatusHistoryEntity.class)
public Response getRemoteProcessGroupStatusHistory(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
if (properties.isClusterManager()) {
final ClusterStatusHistoryDTO dto = serviceFacade.getClusterRemoteProcessGroupStatusHistory(id);
// create the revision
RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create entity
final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
entity.setClusterStatusHistory(dto);
entity.setRevision(revision);
// generate the response
return generateOkResponse(entity).build();
}
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
public void setProperties(NiFiProperties properties) {
this.properties = properties;
}
}