blob: feead4ee700583e7ff499f12acd22503fc9ab7d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.ComponentAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.BulletinDTO;
import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.RegistryDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ComponentHistoryEntity;
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.HistoryEntity;
import org.apache.nifi.web.api.entity.NodeEntity;
import org.apache.nifi.web.api.entity.RegistryClientEntity;
import org.apache.nifi.web.api.entity.RegistryClientsEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.DateTimeParameter;
import org.apache.nifi.web.api.request.LongParameter;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.util.Date;
import java.util.Set;
/**
* RESTful endpoint for managing a Flow Controller.
*/
@Path("/controller")
@Api(
value = "/controller",
description = "Provides realtime command and control of this NiFi instance"
)
public class ControllerResource extends ApplicationResource {
private NiFiServiceFacade serviceFacade;
private Authorizer authorizer;
private ReportingTaskResource reportingTaskResource;
private ControllerServiceResource controllerServiceResource;
/**
* Populate the uri's for the specified registry.
*
* @param registryClientEntity registry
* @return dtos
*/
public RegistryClientEntity populateRemainingRegistryEntityContent(final RegistryClientEntity registryClientEntity) {
registryClientEntity.setUri(generateResourceUri("controller", "registry-clients", registryClientEntity.getId()));
return registryClientEntity;
}
/**
* Authorizes access to the flow.
*/
private void authorizeController(final RequestAction action) {
serviceFacade.authorizeAccess(lookup -> {
final Authorizable controller = lookup.getController();
controller.authorize(authorizer, action, NiFiUserUtils.getNiFiUser());
});
}
/**
* Retrieves the configuration for this NiFi.
*
* @return A controllerConfigurationEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("config")
@ApiOperation(
value = "Retrieves the configuration for this NiFi Controller",
response = ControllerConfigurationEntity.class,
authorizations = {
@Authorization(value = "Read - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getControllerConfig() {
authorizeController(RequestAction.READ);
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
final ControllerConfigurationEntity entity = serviceFacade.getControllerConfiguration();
return generateOkResponse(entity).build();
}
/**
* Update the configuration for this NiFi.
*
* @param httpServletRequest request
* @param requestConfigEntity A controllerConfigurationEntity.
* @return A controllerConfigurationEntity.
*/
@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("config")
@ApiOperation(
value = "Retrieves the configuration for this NiFi",
response = ControllerConfigurationEntity.class,
authorizations = {
@Authorization(value = "Write - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response updateControllerConfig(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(
value = "The controller configuration.",
required = true
) final ControllerConfigurationEntity requestConfigEntity) {
if (requestConfigEntity == null || requestConfigEntity.getComponent() == null) {
throw new IllegalArgumentException("Controller configuration must be specified");
}
if (requestConfigEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestConfigEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestConfigEntity.isDisconnectedNodeAcknowledged());
}
final Revision requestRevision = getRevision(requestConfigEntity.getRevision(), FlowController.class.getSimpleName());
return withWriteLock(
serviceFacade,
requestConfigEntity,
requestRevision,
lookup -> {
authorizeController(RequestAction.WRITE);
},
null,
(revision, configEntity) -> {
final ControllerConfigurationEntity entity = serviceFacade.updateControllerConfiguration(revision, configEntity.getComponent());
return generateOkResponse(entity).build();
}
);
}
// ---------------
// reporting tasks
// ---------------
/**
* Creates a new Reporting Task.
*
* @param httpServletRequest request
* @param requestReportingTaskEntity A reportingTaskEntity.
* @return A reportingTaskEntity.
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("reporting-tasks")
@ApiOperation(
value = "Creates a new reporting task",
response = ReportingTaskEntity.class,
authorizations = {
@Authorization(value = "Write - /controller"),
@Authorization(value = "Read - any referenced Controller Services - /controller-services/{uuid}"),
@Authorization(value = "Write - if the Reporting Task is restricted - /restricted-components")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response createReportingTask(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(
value = "The reporting task configuration details.",
required = true
) final ReportingTaskEntity requestReportingTaskEntity) {
if (requestReportingTaskEntity == null || requestReportingTaskEntity.getComponent() == null) {
throw new IllegalArgumentException("Reporting task details must be specified.");
}
if (requestReportingTaskEntity.getRevision() == null || (requestReportingTaskEntity.getRevision().getVersion() == null || requestReportingTaskEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Reporting task.");
}
final ReportingTaskDTO requestReportingTask = requestReportingTaskEntity.getComponent();
if (requestReportingTask.getId() != null) {
throw new IllegalArgumentException("Reporting task ID cannot be specified.");
}
if (StringUtils.isBlank(requestReportingTask.getType())) {
throw new IllegalArgumentException("The type of reporting task to create must be specified.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestReportingTaskEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestReportingTaskEntity.isDisconnectedNodeAcknowledged());
}
return withWriteLock(
serviceFacade,
requestReportingTaskEntity,
lookup -> {
authorizeController(RequestAction.WRITE);
ComponentAuthorizable authorizable = null;
try {
authorizable = lookup.getConfigurableComponent(requestReportingTask.getType(), requestReportingTask.getBundle());
if (authorizable.isRestricted()) {
authorizeRestrictions(authorizer, authorizable);
}
if (requestReportingTask.getProperties() != null) {
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestReportingTask.getProperties(), authorizable, authorizer, lookup);
}
} finally {
if (authorizable != null) {
authorizable.cleanUpResources();
}
}
},
() -> serviceFacade.verifyCreateReportingTask(requestReportingTask),
(reportingTaskEntity) -> {
final ReportingTaskDTO reportingTask = reportingTaskEntity.getComponent();
// set the processor id as appropriate
reportingTask.setId(generateUuid());
// create the reporting task and generate the json
final Revision revision = getRevision(reportingTaskEntity, reportingTask.getId());
final ReportingTaskEntity entity = serviceFacade.createReportingTask(revision, reportingTask);
reportingTaskResource.populateRemainingReportingTaskEntityContent(entity);
// build the response
return generateCreatedResponse(URI.create(entity.getUri()), entity).build();
}
);
}
// ----------
// registries
// ----------
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("registry-clients")
@ApiOperation(value = "Gets the listing of available registry clients", response = RegistryClientsEntity.class, authorizations = {
@Authorization(value = "Read - /flow")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response getRegistryClients() {
authorizeController(RequestAction.READ);
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
final Set<RegistryClientEntity> registries = serviceFacade.getRegistryClients();
registries.forEach(registry -> populateRemainingRegistryEntityContent(registry));
final RegistryClientsEntity registryEntities = new RegistryClientsEntity();
registryEntities.setRegistries(registries);
return generateOkResponse(registryEntities).build();
}
/**
* Creates a new Registry.
*
* @param httpServletRequest request
* @param requestRegistryClientEntity A registryClientEntity.
* @return A registryClientEntity.
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("registry-clients")
@ApiOperation(
value = "Creates a new registry client",
response = RegistryClientEntity.class,
authorizations = {
@Authorization(value = "Write - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response createRegistryClient(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(
value = "The registry configuration details.",
required = true
) final RegistryClientEntity requestRegistryClientEntity) {
if (requestRegistryClientEntity == null || requestRegistryClientEntity.getComponent() == null) {
throw new IllegalArgumentException("Registry details must be specified.");
}
if (requestRegistryClientEntity.getRevision() == null || (requestRegistryClientEntity.getRevision().getVersion() == null || requestRegistryClientEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Registry.");
}
final RegistryDTO requestRegistryClient = requestRegistryClientEntity.getComponent();
if (requestRegistryClient.getId() != null) {
throw new IllegalArgumentException("Registry ID cannot be specified.");
}
if (StringUtils.isBlank(requestRegistryClient.getName())) {
throw new IllegalArgumentException("Registry name must be specified.");
}
if (StringUtils.isBlank(requestRegistryClient.getUri())) {
throw new IllegalArgumentException("Registry URL must be specified.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestRegistryClientEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestRegistryClientEntity.isDisconnectedNodeAcknowledged());
}
return withWriteLock(
serviceFacade,
requestRegistryClientEntity,
lookup -> {
authorizeController(RequestAction.WRITE);
},
null,
(registryEntity) -> {
final RegistryDTO registry = registryEntity.getComponent();
// set the processor id as appropriate
registry.setId(generateUuid());
// create the reporting task and generate the json
final Revision revision = getRevision(registryEntity, registry.getId());
final RegistryClientEntity entity = serviceFacade.createRegistryClient(revision, registry);
populateRemainingRegistryEntityContent(entity);
// build the response
return generateCreatedResponse(URI.create(entity.getUri()), entity).build();
}
);
}
/**
* Retrieves the specified registry.
*
* @param id The id of the registry to retrieve
* @return A registryClientEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("/registry-clients/{id}")
@ApiOperation(
value = "Gets a registry client",
response = RegistryClientEntity.class,
authorizations = {
@Authorization(value = "Read - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getRegistryClient(
@ApiParam(
value = "The registry id.",
required = true
)
@PathParam("id") final String id) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// authorize access
authorizeController(RequestAction.READ);
// get the registry
final RegistryClientEntity entity = serviceFacade.getRegistryClient(id);
populateRemainingRegistryEntityContent(entity);
return generateOkResponse(entity).build();
}
/**
* Updates the specified registry.
*
* @param httpServletRequest request
* @param id The id of the controller service to update.
* @param requestRegistryEntity A controllerServiceEntity.
* @return A controllerServiceEntity.
*/
@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("/registry-clients/{id}")
@ApiOperation(
value = "Updates a registry client",
response = RegistryClientEntity.class,
authorizations = {
@Authorization(value = "Write - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response updateRegistryClient(
@Context HttpServletRequest httpServletRequest,
@ApiParam(
value = "The registry id.",
required = true
)
@PathParam("id") final String id,
@ApiParam(
value = "The registry configuration details.",
required = true
) final RegistryClientEntity requestRegistryEntity) {
if (requestRegistryEntity == null || requestRegistryEntity.getComponent() == null) {
throw new IllegalArgumentException("Registry details must be specified.");
}
if (requestRegistryEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the ids are the same
final RegistryDTO requestRegistryClient = requestRegistryEntity.getComponent();
if (!id.equals(requestRegistryClient.getId())) {
throw new IllegalArgumentException(String.format("The registry id (%s) in the request body does not equal the "
+ "registry id of the requested resource (%s).", requestRegistryClient.getId(), id));
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestRegistryEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestRegistryEntity.isDisconnectedNodeAcknowledged());
}
if (requestRegistryClient.getName() != null && StringUtils.isBlank(requestRegistryClient.getName())) {
throw new IllegalArgumentException("Registry name must be specified.");
}
if (requestRegistryClient.getUri() != null && StringUtils.isBlank(requestRegistryClient.getUri())) {
throw new IllegalArgumentException("Registry URL must be specified.");
}
// handle expects request (usually from the cluster manager)
final Revision requestRevision = getRevision(requestRegistryEntity, id);
return withWriteLock(
serviceFacade,
requestRegistryEntity,
requestRevision,
lookup -> {
authorizeController(RequestAction.WRITE);
},
null,
(revision, registryEntity) -> {
final RegistryDTO registry = registryEntity.getComponent();
// update the controller service
final RegistryClientEntity entity = serviceFacade.updateRegistryClient(revision, registry);
populateRemainingRegistryEntityContent(entity);
return generateOkResponse(entity).build();
}
);
}
/**
* Removes the specified registry.
*
* @param httpServletRequest request
* @param version The revision is used to verify the client is working with
* the latest version of the flow.
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
* @param id The id of the registry to remove.
* @return A entity containing the client id and an updated revision.
*/
@DELETE
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("/registry-clients/{id}")
@ApiOperation(
value = "Deletes a registry client",
response = RegistryClientEntity.class,
authorizations = {
@Authorization(value = "Write - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response deleteRegistryClient(
@Context HttpServletRequest httpServletRequest,
@ApiParam(
value = "The revision is used to verify the client is working with the latest version of the flow.",
required = false
)
@QueryParam(VERSION) final LongParameter version,
@ApiParam(
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
)
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId,
@ApiParam(
value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.",
required = false
)
@QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
@ApiParam(
value = "The registry id.",
required = true
)
@PathParam("id") final String id) {
if (isReplicateRequest()) {
return replicate(HttpMethod.DELETE);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(disconnectedNodeAcknowledged);
}
final RegistryClientEntity requestRegistryClientEntity = new RegistryClientEntity();
requestRegistryClientEntity.setId(id);
// handle expects request (usually from the cluster manager)
final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id);
return withWriteLock(
serviceFacade,
requestRegistryClientEntity,
requestRevision,
lookup -> {
authorizeController(RequestAction.WRITE);
},
() -> serviceFacade.verifyDeleteRegistry(id),
(revision, registryEntity) -> {
// delete the specified registry
final RegistryClientEntity entity = serviceFacade.deleteRegistryClient(revision, registryEntity.getId());
return generateOkResponse(entity).build();
}
);
}
/**
* Creates a Bulletin.
*
* @param httpServletRequest request
* @param requestBulletinEntity A bulletinEntity.
* @return A bulletinEntity.
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("bulletin")
@ApiOperation(
value = "Creates a new bulletin",
response = BulletinEntity.class,
authorizations = {
@Authorization(value = "Write - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response createBulletin(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(
value = "The reporting task configuration details.",
required = true
) final BulletinEntity requestBulletinEntity) {
if (requestBulletinEntity == null || requestBulletinEntity.getBulletin() == null) {
throw new IllegalArgumentException("Bulletin details must be specified.");
}
final BulletinDTO requestBulletin = requestBulletinEntity.getBulletin();
if (requestBulletin.getId() != null) {
throw new IllegalArgumentException("A bulletin ID cannot be specified.");
}
if (StringUtils.isBlank(requestBulletin.getMessage())) {
throw new IllegalArgumentException("The bulletin message must be specified.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestBulletinEntity);
}
return withWriteLock(
serviceFacade,
requestBulletinEntity,
lookup -> {
authorizeController(RequestAction.WRITE);
},
null,
(bulletinEntity) -> {
final BulletinDTO bulletin = bulletinEntity.getBulletin();
final BulletinEntity entity = serviceFacade.createBulletin(bulletin,true);
return generateOkResponse(entity).build();
}
);
}
// -------------------
// controller services
// -------------------
/**
* Creates a new Controller Service.
*
* @param httpServletRequest request
* @param requestControllerServiceEntity A controllerServiceEntity.
* @return A controllerServiceEntity.
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("controller-services")
@ApiOperation(
value = "Creates a new controller service",
response = ControllerServiceEntity.class,
authorizations = {
@Authorization(value = "Write - /controller"),
@Authorization(value = "Read - any referenced Controller Services - /controller-services/{uuid}"),
@Authorization(value = "Write - if the Controller Service is restricted - /restricted-components")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response createControllerService(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(
value = "The controller service configuration details.",
required = true
) final ControllerServiceEntity requestControllerServiceEntity) {
if (requestControllerServiceEntity == null || requestControllerServiceEntity.getComponent() == null) {
throw new IllegalArgumentException("Controller service details must be specified.");
}
if (requestControllerServiceEntity.getRevision() == null
|| (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service.");
}
final ControllerServiceDTO requestControllerService = requestControllerServiceEntity.getComponent();
if (requestControllerService.getId() != null) {
throw new IllegalArgumentException("Controller service ID cannot be specified.");
}
if (requestControllerService.getParentGroupId() != null) {
throw new IllegalArgumentException("Parent process group ID cannot be specified.");
}
if (StringUtils.isBlank(requestControllerService.getType())) {
throw new IllegalArgumentException("The type of controller service to create must be specified.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestControllerServiceEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestControllerServiceEntity.isDisconnectedNodeAcknowledged());
}
return withWriteLock(
serviceFacade,
requestControllerServiceEntity,
lookup -> {
authorizeController(RequestAction.WRITE);
ComponentAuthorizable authorizable = null;
try {
authorizable = lookup.getConfigurableComponent(requestControllerService.getType(), requestControllerService.getBundle());
if (authorizable.isRestricted()) {
authorizeRestrictions(authorizer, authorizable);
}
if (requestControllerService.getProperties() != null) {
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(), authorizable, authorizer, lookup);
}
} finally {
if (authorizable != null) {
authorizable.cleanUpResources();
}
}
},
() -> serviceFacade.verifyCreateControllerService(requestControllerService),
(controllerServiceEntity) -> {
final ControllerServiceDTO controllerService = controllerServiceEntity.getComponent();
// set the processor id as appropriate
controllerService.setId(generateUuid());
// create the controller service and generate the json
final Revision revision = getRevision(controllerServiceEntity, controllerService.getId());
final ControllerServiceEntity entity = serviceFacade.createControllerService(revision, null, controllerService);
controllerServiceResource.populateRemainingControllerServiceEntityContent(entity);
// build the response
return generateCreatedResponse(URI.create(entity.getUri()), entity).build();
}
);
}
// -------
// cluster
// -------
/**
* Gets the contents of this NiFi cluster. This includes all nodes and their status.
*
* @return A clusterEntity
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("cluster")
@ApiOperation(
value = "Gets the contents of the cluster",
notes = "Returns the contents of the cluster including all nodes and their status.",
response = ClusterEntity.class,
authorizations = {
@Authorization(value = "Read - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getCluster() {
authorizeController(RequestAction.READ);
// ensure connected to the cluster
if (!isConnectedToCluster()) {
throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.GET, getClusterCoordinatorNode());
}
final ClusterDTO dto = serviceFacade.getCluster();
// create entity
final ClusterEntity entity = new ClusterEntity();
entity.setCluster(dto);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Gets the contents of the specified node in this NiFi cluster.
*
* @param id The node id.
* @return A nodeEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("cluster/nodes/{id}")
@ApiOperation(
value = "Gets a node in the cluster",
response = NodeEntity.class,
authorizations = {
@Authorization(value = "Read - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getNode(
@ApiParam(
value = "The node id.",
required = true
)
@PathParam("id") String id) {
authorizeController(RequestAction.READ);
// ensure connected to the cluster
if (!isConnectedToCluster()) {
throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request.");
}
if (isReplicateRequest()) {
return replicate(HttpMethod.GET, getClusterCoordinatorNode());
}
// get the specified relationship
final NodeDTO dto = serviceFacade.getNode(id);
// create the response entity
final NodeEntity entity = new NodeEntity();
entity.setNode(dto);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Updates the contents of the specified node in this NiFi cluster.
*
* @param id The id of the node
* @param nodeEntity A nodeEntity
* @return A nodeEntity
*/
@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("cluster/nodes/{id}")
@ApiOperation(
value = "Updates a node in the cluster",
response = NodeEntity.class,
authorizations = {
@Authorization(value = "Write - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response updateNode(
@ApiParam(
value = "The node id.",
required = true
)
@PathParam("id") String id,
@ApiParam(
value = "The node configuration. The only configuration that will be honored at this endpoint is the status.",
required = true
) NodeEntity nodeEntity) {
authorizeController(RequestAction.WRITE);
// ensure connected to the cluster
if (!isConnectedToCluster()) {
throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request.");
}
if (nodeEntity == null || nodeEntity.getNode() == null) {
throw new IllegalArgumentException("Node details must be specified.");
}
// get the request node
final NodeDTO requestNodeDTO = nodeEntity.getNode();
if (!id.equals(requestNodeDTO.getNodeId())) {
throw new IllegalArgumentException(String.format("The node id (%s) in the request body does "
+ "not equal the node id of the requested resource (%s).", requestNodeDTO.getNodeId(), id));
}
if (isReplicateRequest()) {
return replicateToCoordinator(HttpMethod.PUT, nodeEntity);
}
// update the node
final NodeDTO node = serviceFacade.updateNode(requestNodeDTO);
// create the response entity
NodeEntity entity = new NodeEntity();
entity.setNode(node);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Removes the specified from this NiFi cluster.
*
* @param id The id of the node
* @return A nodeEntity
*/
@DELETE
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("cluster/nodes/{id}")
@ApiOperation(
value = "Removes a node from the cluster",
response = NodeEntity.class,
authorizations = {
@Authorization(value = "Write - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response deleteNode(
@ApiParam(
value = "The node id.",
required = true
)
@PathParam("id") String id) {
authorizeController(RequestAction.WRITE);
// ensure connected to the cluster
if (!isConnectedToCluster()) {
throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request.");
}
if (isReplicateRequest()) {
return replicateToCoordinator(HttpMethod.DELETE, getRequestParameters());
}
serviceFacade.deleteNode(id);
// create the response entity
final NodeEntity entity = new NodeEntity();
// generate the response
return generateOkResponse(entity).build();
}
// -------
// history
// -------
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("status/history")
@ApiOperation(
value = "Gets status history for the node",
notes = NON_GUARANTEED_ENDPOINT,
response = ComponentHistoryEntity.class,
authorizations = {
@Authorization(value = "Read - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getNodeStatusHistory() {
authorizeController(RequestAction.READ);
// replicate if cluster manager
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// generate the response
return generateOkResponse(serviceFacade.getNodeStatusHistory()).build();
}
/**
* Deletes flow history from the specified end date.
*
* @param endDate The end date for the purge action.
* @return A historyEntity
*/
@DELETE
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("history")
@ApiOperation(
value = "Purges history",
response = HistoryEntity.class,
authorizations = {
@Authorization(value = "Write - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response deleteHistory(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(
value = "Purge actions before this date/time.",
required = true
)
@QueryParam("endDate") DateTimeParameter endDate) {
// ensure the end date is specified
if (endDate == null) {
throw new IllegalArgumentException("The end date must be specified.");
}
// Note: History requests are not replicated throughout the cluster and are instead handled by the nodes independently
return withWriteLock(
serviceFacade,
new EndDateEntity(endDate.getDateTime()),
lookup -> {
authorizeController(RequestAction.WRITE);
},
null,
(endDateEntity) -> {
// purge the actions
serviceFacade.deleteActions(endDateEntity.getEndDate());
// generate the response
return generateOkResponse(new HistoryEntity()).build();
}
);
}
private class EndDateEntity extends Entity {
final Date endDate;
public EndDateEntity(Date endDate) {
this.endDate = endDate;
}
public Date getEndDate() {
return endDate;
}
}
// setters
public void setServiceFacade(final NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
public void setReportingTaskResource(final ReportingTaskResource reportingTaskResource) {
this.reportingTaskResource = reportingTaskResource;
}
public void setControllerServiceResource(final ControllerServiceResource controllerServiceResource) {
this.controllerServiceResource = controllerServiceResource;
}
public void setAuthorizer(final Authorizer authorizer) {
this.authorizer = authorizer;
}
}