| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.web.api; |
| |
| import io.swagger.annotations.Api; |
| import io.swagger.annotations.ApiOperation; |
| import io.swagger.annotations.ApiParam; |
| import io.swagger.annotations.ApiResponse; |
| import io.swagger.annotations.ApiResponses; |
| import io.swagger.annotations.Authorization; |
| import org.apache.nifi.authorization.Authorizer; |
| import org.apache.nifi.authorization.RequestAction; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.authorization.user.NiFiUserUtils; |
| import org.apache.nifi.cluster.manager.NodeResponse; |
| import org.apache.nifi.cluster.manager.exception.UnknownNodeException; |
| import org.apache.nifi.cluster.protocol.NodeIdentifier; |
| import org.apache.nifi.web.NiFiServiceFacade; |
| import org.apache.nifi.web.api.dto.CounterDTO; |
| import org.apache.nifi.web.api.dto.CountersDTO; |
| import org.apache.nifi.web.api.entity.ComponentEntity; |
| import org.apache.nifi.web.api.entity.CounterEntity; |
| import org.apache.nifi.web.api.entity.CountersEntity; |
| |
| import javax.servlet.http.HttpServletRequest; |
| import javax.ws.rs.Consumes; |
| import javax.ws.rs.DefaultValue; |
| import javax.ws.rs.GET; |
| import javax.ws.rs.HttpMethod; |
| import javax.ws.rs.PUT; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.PathParam; |
| import javax.ws.rs.Produces; |
| import javax.ws.rs.QueryParam; |
| import javax.ws.rs.core.Context; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| |
| |
| /** |
| * RESTful endpoint for managing a cluster. |
| */ |
| @Path("/counters") |
| @Api( |
| value = "/counters", |
| description = "Endpoint for managing counters." |
| ) |
| public class CountersResource extends ApplicationResource { |
| |
| private NiFiServiceFacade serviceFacade; |
| private Authorizer authorizer; |
| |
| /** |
| * Authorizes access to the flow. |
| */ |
| private void authorizeCounters(final RequestAction action) { |
| serviceFacade.authorizeAccess(lookup -> { |
| final Authorizable counters = lookup.getCounters(); |
| counters.authorize(authorizer, action, NiFiUserUtils.getNiFiUser()); |
| }); |
| } |
| |
| /** |
| * Retrieves the counters report for this NiFi. |
| * |
| * @return A countersEntity. |
| * @throws InterruptedException if interrupted |
| */ |
| @GET |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("") // necessary due to a bug in swagger |
| @ApiOperation( |
| value = "Gets the current counters for this NiFi", |
| notes = NON_GUARANTEED_ENDPOINT, |
| response = CountersEntity.class, |
| authorizations = { |
| @Authorization(value = "Read - /counters") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response getCounters( |
| @ApiParam( |
| value = "Whether or not to include the breakdown per node. Optional, defaults to false", |
| required = false |
| ) |
| @QueryParam("nodewise") @DefaultValue(NODEWISE) final Boolean nodewise, |
| @ApiParam( |
| value = "The id of the node where to get the status.", |
| required = false |
| ) |
| @QueryParam("clusterNodeId") final String clusterNodeId) throws InterruptedException { |
| |
| authorizeCounters(RequestAction.READ); |
| |
| // ensure a valid request |
| if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { |
| throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); |
| } |
| |
| // replicate if necessary |
| if (isReplicateRequest()) { |
| // determine where this request should be sent |
| if (clusterNodeId == null) { |
| final NodeResponse nodeResponse; |
| |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly |
| // to the cluster nodes themselves. |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); |
| } else { |
| nodeResponse = getRequestReplicator().forwardToCoordinator( |
| getClusterCoordinatorNode(), HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); |
| } |
| |
| final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity(); |
| |
| // ensure there is an updated entity (result of merging) and prune the response as necessary |
| if (entity != null && !nodewise) { |
| entity.getCounters().setNodeSnapshots(null); |
| } |
| |
| return nodeResponse.getResponse(); |
| } else { |
| // get the target node and ensure it exists |
| final NodeIdentifier targetNode = getClusterCoordinator().getNodeIdentifier(clusterNodeId); |
| if (targetNode == null) { |
| throw new UnknownNodeException("The specified cluster node does not exist."); |
| } |
| |
| return replicate(HttpMethod.GET, targetNode); |
| } |
| } |
| |
| final CountersDTO countersReport = serviceFacade.getCounters(); |
| |
| // create the response entity |
| final CountersEntity entity = new CountersEntity(); |
| entity.setCounters(countersReport); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| |
| /** |
| * Update the specified counter. This will reset the counter value to 0. |
| * |
| * @param httpServletRequest request |
| * @param id The id of the counter. |
| * @return A counterEntity. |
| */ |
| @PUT |
| @Consumes(MediaType.WILDCARD) |
| @Produces(MediaType.APPLICATION_JSON) |
| @Path("{id}") |
| @ApiOperation( |
| value = "Updates the specified counter. This will reset the counter value to 0", |
| notes = NON_GUARANTEED_ENDPOINT, |
| response = CounterEntity.class, |
| authorizations = { |
| @Authorization(value = "Write - /counters") |
| } |
| ) |
| @ApiResponses( |
| value = { |
| @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), |
| @ApiResponse(code = 401, message = "Client could not be authenticated."), |
| @ApiResponse(code = 403, message = "Client is not authorized to make this request."), |
| @ApiResponse(code = 404, message = "The specified resource could not be found."), |
| @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") |
| } |
| ) |
| public Response updateCounter( |
| @Context final HttpServletRequest httpServletRequest, |
| @ApiParam( |
| value = "The id of the counter." |
| ) |
| @PathParam("id") final String id) { |
| |
| if (isReplicateRequest()) { |
| return replicate(HttpMethod.PUT); |
| } |
| |
| final ComponentEntity requestComponentEntity = new ComponentEntity(); |
| requestComponentEntity.setId(id); |
| |
| return withWriteLock( |
| serviceFacade, |
| requestComponentEntity, |
| lookup -> { |
| authorizeCounters(RequestAction.WRITE); |
| }, |
| null, |
| (componentEntity) -> { |
| // reset the specified counter |
| final CounterDTO counter = serviceFacade.updateCounter(componentEntity.getId()); |
| |
| // create the response entity |
| final CounterEntity entity = new CounterEntity(); |
| entity.setCounter(counter); |
| |
| // generate the response |
| return generateOkResponse(entity).build(); |
| } |
| ); |
| } |
| |
| // setters |
| |
| public void setServiceFacade(NiFiServiceFacade serviceFacade) { |
| this.serviceFacade = serviceFacade; |
| } |
| |
| public void setAuthorizer(Authorizer authorizer) { |
| this.authorizer = authorizer; |
| } |
| } |