blob: 9f6b13562fd095cae110afb5eb930cc50dc5b053 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.ConfigurationSnapshot;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.entity.ReportingTasksEntity;
import org.apache.nifi.web.util.Availability;
import org.codehaus.enunciate.jaxrs.TypeHint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
/**
* RESTful endpoint for managing a Reporting Task.
*/
public class ReportingTaskResource extends ApplicationResource {
private static final Logger logger = LoggerFactory.getLogger(ReportingTaskResource.class);
private NiFiServiceFacade serviceFacade;
private WebClusterManager clusterManager;
private NiFiProperties properties;
/**
* Populates the uri for the specified reporting task.
*
* @param reportingTasks
* @return
*/
private Set<ReportingTaskDTO> populateRemainingReportingTasksContent(final String availability, final Set<ReportingTaskDTO> reportingTasks) {
for (ReportingTaskDTO reportingTask : reportingTasks) {
populateRemainingReportingTaskContent(availability, reportingTask);
}
return reportingTasks;
}
/**
* Populates the uri for the specified reporting task.
*/
private ReportingTaskDTO populateRemainingReportingTaskContent(final String availability, final ReportingTaskDTO reportingTask) {
// populate the reporting task href
reportingTask.setUri(generateResourceUri("controller", "reporting-tasks", availability, reportingTask.getId()));
reportingTask.setAvailability(availability);
return reportingTask;
}
/**
* Parses the availability and ensure that the specified availability makes sense for the
* given NiFi instance.
*
* @param availability
* @return
*/
private Availability parseAvailability(final String availability) {
final Availability avail;
try {
avail = Availability.valueOf(availability.toUpperCase());
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format("Availability: Value must be one of [%s]", StringUtils.join(Availability.values(), ", ")));
}
// ensure this nifi is an NCM is specifying NCM availability
if (!properties.isClusterManager() && Availability.NCM.equals(avail)) {
throw new IllegalArgumentException("Availability of NCM is only applicable when the NiFi instance is the cluster manager.");
}
return avail;
}
/**
* Retrieves all the of reporting tasks in this NiFi.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param availability Whether the reporting task is available on the NCM only (ncm) or on the
* nodes only (node). If this instance is not clustered all tasks should use the node availability.
* @return A reportingTasksEntity.
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("{availability}")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ReportingTasksEntity.class)
public Response getReportingTasks(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("availability") String availability) {
final Availability avail = parseAvailability(availability);
// replicate if cluster manager
if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// get all the reporting tasks
final Set<ReportingTaskDTO> reportingTasks = populateRemainingReportingTasksContent(availability, serviceFacade.getReportingTasks());
// create the revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create the response entity
final ReportingTasksEntity entity = new ReportingTasksEntity();
entity.setRevision(revision);
entity.setReportingTasks(reportingTasks);
// generate the response
return clusterContext(generateOkResponse(entity)).build();
}
/**
* Creates a new reporting task.
*
* @param httpServletRequest
* @param version The revision is used to verify the client is working with
* the latest version of the flow.
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param availability Whether the reporting task is available on the NCM only (ncm) or on the
* nodes only (node). If this instance is not clustered all tasks should use the node availability.
* @param type The type of reporting task to create.
* @return A reportingTaskEntity.
*/
@POST
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("{availability}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ReportingTaskEntity.class)
public Response createReportingTask(
@Context HttpServletRequest httpServletRequest,
@FormParam(VERSION) LongParameter version,
@FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@PathParam("availability") String availability,
@FormParam("type") String type) {
// create the reporting task DTO
final ReportingTaskDTO reportingTaskDTO = new ReportingTaskDTO();
reportingTaskDTO.setType(type);
// create the revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
if (version != null) {
revision.setVersion(version.getLong());
}
// create the reporting task entity
final ReportingTaskEntity reportingTaskEntity = new ReportingTaskEntity();
reportingTaskEntity.setRevision(revision);
reportingTaskEntity.setReportingTask(reportingTaskDTO);
return createReportingTask(httpServletRequest, availability, reportingTaskEntity);
}
/**
* Creates a new Reporting Task.
*
* @param httpServletRequest
* @param availability Whether the reporting task is available on the NCM only (ncm) or on the
* nodes only (node). If this instance is not clustered all tasks should use the node availability.
* @param reportingTaskEntity A reportingTaskEntity.
* @return A reportingTaskEntity.
*/
@POST
@Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("{availability}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ReportingTaskEntity.class)
public Response createReportingTask(
@Context HttpServletRequest httpServletRequest,
@PathParam("availability") String availability,
ReportingTaskEntity reportingTaskEntity) {
final Availability avail = parseAvailability(availability);
if (reportingTaskEntity == null || reportingTaskEntity.getReportingTask()== null) {
throw new IllegalArgumentException("Reporting task details must be specified.");
}
if (reportingTaskEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// get the revision
final RevisionDTO revision = reportingTaskEntity.getRevision();
if (reportingTaskEntity.getReportingTask().getId() != null) {
throw new IllegalArgumentException("Reporting task ID cannot be specified.");
}
// if cluster manager, convert POST to PUT (to maintain same ID across nodes) and replicate
if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
// create ID for resource
final String id = UUID.randomUUID().toString();
// set ID for resource
reportingTaskEntity.getReportingTask().setId(id);
// convert POST request to PUT request to force entity ID to be the same across nodes
URI putUri = null;
try {
putUri = new URI(getAbsolutePath().toString() + "/" + id);
} catch (final URISyntaxException e) {
throw new WebApplicationException(e);
}
// change content type to JSON for serializing entity
final Map<String, String> headersToOverride = new HashMap<>();
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
// replicate put request
return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(reportingTaskEntity), getHeaders(headersToOverride)).getResponse();
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
if (expects != null) {
return generateContinueResponse().build();
}
// create the reporting task and generate the json
final ConfigurationSnapshot<ReportingTaskDTO> controllerResponse = serviceFacade.createReportingTask(
new Revision(revision.getVersion(), revision.getClientId()), reportingTaskEntity.getReportingTask());
final ReportingTaskDTO reportingTask = controllerResponse.getConfiguration();
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final ReportingTaskEntity entity = new ReportingTaskEntity();
entity.setRevision(updatedRevision);
entity.setReportingTask(populateRemainingReportingTaskContent(availability, reportingTask));
// build the response
return clusterContext(generateCreatedResponse(URI.create(reportingTask.getUri()), entity)).build();
}
/**
* Retrieves the specified reporting task.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param availability Whether the reporting task is available on the NCM only (ncm) or on the
* nodes only (node). If this instance is not clustered all tasks should use the node availability.
* @param id The id of the reporting task to retrieve
* @return A reportingTaskEntity.
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("{availability}/{id}")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ReportingTaskEntity.class)
public Response getReportingTask(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@PathParam("availability") String availability, @PathParam("id") String id) {
final Availability avail = parseAvailability(availability);
// replicate if cluster manager
if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// get the reporting task
final ReportingTaskDTO reportingTask = serviceFacade.getReportingTask(id);
// create the revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create the response entity
final ReportingTaskEntity entity = new ReportingTaskEntity();
entity.setRevision(revision);
entity.setReportingTask(populateRemainingReportingTaskContent(availability, reportingTask));
return clusterContext(generateOkResponse(entity)).build();
}
/**
* Updates the specified reporting task.
*
* @param httpServletRequest
* @param version The revision is used to verify the client is working with
* the latest version of the flow.
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param availability Whether the reporting task is available on the NCM only (ncm) or on the
* nodes only (node). If this instance is not clustered all tasks should use the node availability.
* @param id The id of the reporting task to update.
* @param name The name of the reporting task
* @param annotationData The annotation data for the reporting task
* @param markedForDeletion Array of property names whose value should be removed.
* @param state The updated scheduled state
* @param schedulingStrategy The scheduling strategy for this reporting task
* @param schedulingPeriod The scheduling period for this reporting task
* @param formParams Additionally, the processor properties and styles are
* specified in the form parameters. Because the property names and styles
* differ from processor to processor they are specified in a map-like
* fashion:
* <br>
* <ul>
* <li>properties[required.file.path]=/path/to/file</li>
* <li>properties[required.hostname]=localhost</li>
* <li>properties[required.port]=80</li>
* <li>properties[optional.file.path]=/path/to/file</li>
* <li>properties[optional.hostname]=localhost</li>
* <li>properties[optional.port]=80</li>
* <li>properties[user.defined.pattern]=^.*?s.*$</li>
* </ul>
* @return A reportingTaskEntity.
*/
@PUT
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("{availability}/{id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ReportingTaskEntity.class)
public Response updateReportingTask(
@Context HttpServletRequest httpServletRequest,
@FormParam(VERSION) LongParameter version,
@FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@PathParam("availability") String availability, @PathParam("id") String id, @FormParam("name") String name,
@FormParam("annotationData") String annotationData, @FormParam("markedForDeletion[]") List<String> markedForDeletion,
@FormParam("state") String state, @FormParam("schedulingStrategy") String schedulingStrategy,
@FormParam("schedulingPeriod") String schedulingPeriod, MultivaluedMap<String, String> formParams) {
// create collections for holding the reporting task properties
final Map<String, String> updatedProperties = new LinkedHashMap<>();
// go through each parameter and look for processor properties
for (String parameterName : formParams.keySet()) {
if (StringUtils.isNotBlank(parameterName)) {
// see if the parameter name starts with an expected parameter type...
// if so, store the parameter name and value in the corresponding collection
if (parameterName.startsWith("properties")) {
final int startIndex = StringUtils.indexOf(parameterName, "[");
final int endIndex = StringUtils.lastIndexOf(parameterName, "]");
if (startIndex != -1 && endIndex != -1) {
final String propertyName = StringUtils.substring(parameterName, startIndex + 1, endIndex);
updatedProperties.put(propertyName, formParams.getFirst(parameterName));
}
}
}
}
// set the properties to remove
for (String propertyToDelete : markedForDeletion) {
updatedProperties.put(propertyToDelete, null);
}
// create the reporting task DTO
final ReportingTaskDTO reportingTaskDTO = new ReportingTaskDTO();
reportingTaskDTO.setId(id);
reportingTaskDTO.setName(name);
reportingTaskDTO.setState(state);
reportingTaskDTO.setSchedulingStrategy(schedulingStrategy);
reportingTaskDTO.setSchedulingPeriod(schedulingPeriod);
reportingTaskDTO.setAnnotationData(annotationData);
// only set the properties when appropriate
if (!updatedProperties.isEmpty()) {
reportingTaskDTO.setProperties(updatedProperties);
}
// create the revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
if (version != null) {
revision.setVersion(version.getLong());
}
// create the reporting task entity
final ReportingTaskEntity reportingTaskEntity = new ReportingTaskEntity();
reportingTaskEntity.setRevision(revision);
reportingTaskEntity.setReportingTask(reportingTaskDTO);
// update the reporting task
return updateReportingTask(httpServletRequest, availability, id, reportingTaskEntity);
}
/**
* Updates the specified a Reporting Task.
*
* @param httpServletRequest
* @param availability Whether the reporting task is available on the NCM only (ncm) or on the
* nodes only (node). If this instance is not clustered all tasks should use the node availability.
* @param id The id of the reporting task to update.
* @param reportingTaskEntity A reportingTaskEntity.
* @return A reportingTaskEntity.
*/
@PUT
@Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("{availability}/{id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ReportingTaskEntity.class)
public Response updateReportingTask(
@Context HttpServletRequest httpServletRequest,
@PathParam("availability") String availability,
@PathParam("id") String id,
ReportingTaskEntity reportingTaskEntity) {
final Availability avail = parseAvailability(availability);
if (reportingTaskEntity == null || reportingTaskEntity.getReportingTask() == null) {
throw new IllegalArgumentException("Reporting task details must be specified.");
}
if (reportingTaskEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final ReportingTaskDTO requestReportingTaskDTO = reportingTaskEntity.getReportingTask();
if (!id.equals(requestReportingTaskDTO.getId())) {
throw new IllegalArgumentException(String.format("The reporting task id (%s) in the request body does not equal the "
+ "reporting task id of the requested resource (%s).", requestReportingTaskDTO.getId(), id));
}
// replicate if cluster manager
if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
// change content type to JSON for serializing entity
final Map<String, String> headersToOverride = new HashMap<>();
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
// replicate the request
return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), updateClientId(reportingTaskEntity), getHeaders(headersToOverride)).getResponse();
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
if (expects != null) {
serviceFacade.verifyUpdateReportingTask(requestReportingTaskDTO);
return generateContinueResponse().build();
}
// update the reporting task
final RevisionDTO revision = reportingTaskEntity.getRevision();
final ConfigurationSnapshot<ReportingTaskDTO> controllerResponse = serviceFacade.updateReportingTask(
new Revision(revision.getVersion(), revision.getClientId()), requestReportingTaskDTO);
// get the results
final ReportingTaskDTO responseReportingTaskDTO = controllerResponse.getConfiguration();
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final ReportingTaskEntity entity = new ReportingTaskEntity();
entity.setRevision(updatedRevision);
entity.setReportingTask(populateRemainingReportingTaskContent(availability, responseReportingTaskDTO));
return clusterContext(generateOkResponse(entity)).build();
}
/**
* Removes the specified reporting task.
*
* @param httpServletRequest
* @param version The revision is used to verify the client is working with
* the latest version of the flow.
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param availability Whether the reporting task is available on the NCM only (ncm) or on the
* nodes only (node). If this instance is not clustered all tasks should use the node availability.
* @param id The id of the reporting task to remove.
* @return A entity containing the client id and an updated revision.
*/
@DELETE
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("{availability}/{id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ReportingTaskEntity.class)
public Response removeReportingTask(
@Context HttpServletRequest httpServletRequest,
@QueryParam(VERSION) LongParameter version,
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@PathParam("availability") String availability, @PathParam("id") String id) {
final Availability avail = parseAvailability(availability);
// replicate if cluster manager
if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// handle expects request (usually from the cluster manager)
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
if (expects != null) {
serviceFacade.verifyDeleteReportingTask(id);
return generateContinueResponse().build();
}
// determine the specified version
Long clientVersion = null;
if (version != null) {
clientVersion = version.getLong();
}
// delete the specified reporting task
final ConfigurationSnapshot<Void> controllerResponse = serviceFacade.deleteReportingTask(new Revision(clientVersion, clientId.getClientId()), id);
// get the updated revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
revision.setVersion(controllerResponse.getVersion());
// build the response entity
final ReportingTaskEntity entity = new ReportingTaskEntity();
entity.setRevision(revision);
return clusterContext(generateOkResponse(entity)).build();
}
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
public void setClusterManager(WebClusterManager clusterManager) {
this.clusterManager = clusterManager;
}
public void setProperties(NiFiProperties properties) {
this.properties = properties;
}
}